Merge branch 'master' into feature/slider-widget

This commit is contained in:
Igor Kulikov 2024-02-14 13:54:20 +02:00
commit bd8a43ceb2
22 changed files with 549 additions and 86 deletions

View File

@ -66,7 +66,6 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNoti
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
@ -90,7 +89,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.gen.transport.TransportProtos.UplinkNotificationMsg;
import org.thingsboard.server.service.rpc.RpcSubmitStrategy;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import javax.annotation.Nullable;
@ -173,7 +174,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private EdgeId findRelatedEdgeId() {
List<EntityRelation> result =
systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON);
systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
if (result != null && result.size() > 0) {
EntityRelation relationToEdge = result.get(0);
if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) {
@ -212,8 +213,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (systemContext.isEdgesEnabled() && edgeId != null) {
log.debug("[{}][{}] device is related to edge: [{}]. Saving RPC request: [{}][{}] to edge queue", tenantId, deviceId, edgeId.getId(), rpcId, requestId);
try {
saveRpcRequestToEdgeQueue(request, requestId).get();
sent = true;
if (systemContext.getEdgeService().isEdgeActiveAsync(tenantId, edgeId, DefaultDeviceStateService.ACTIVITY_STATE).get()) {
saveRpcRequestToEdgeQueue(request, requestId).get();
} else {
log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}. The Edge is currently offline or unreachable", tenantId, deviceId, edgeId.getId(), request);
}
} catch (InterruptedException | ExecutionException e) {
log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}", tenantId, deviceId, edgeId.getId(), request, e);
}
@ -470,7 +474,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
callback.onSuccess();
}
private void processUplinkNotificationMsg(SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg uplinkNotificationMsg) {
private void processUplinkNotificationMsg(SessionInfoProto sessionInfo, UplinkNotificationMsg uplinkNotificationMsg) {
String nodeId = sessionInfo.getNodeId();
sessions.entrySet().stream()
.filter(kv -> kv.getValue().getSessionInfo().getNodeId().equals(nodeId) && (kv.getValue().isSubscribedToAttributes() || kv.getValue().isSubscribedToRPC()))

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.actors.ruleChain;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorRef;
@ -29,6 +28,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.data.relation.EntityRelation;

View File

@ -477,6 +477,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
TbMsgMetaData md = new TbMsgMetaData();
if (!persistToTelemetry) {
md.putValue(DataConstants.SCOPE, DataConstants.SERVER_SCOPE);
md.putValue("edgeName", edge.getName());
md.putValue("edgeType", edge.getType());
}
TbMsg tbMsg = TbMsg.newMsg(msgType, edgeId, md, TbMsgDataType.JSON, data);
clusterService.pushMsgToRuleEngine(tenantId, edgeId, tbMsg, null);

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.processor.rule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.rule.RuleChain;
@ -53,6 +54,10 @@ public class RuleChainEdgeProcessor extends BaseEdgeProcessor {
isRoot = Boolean.parseBoolean(edgeEvent.getBody().get(EDGE_IS_ROOT_BODY_KEY).asText());
} catch (Exception ignored) {}
}
if (!isRoot) {
Edge edge = edgeService.findEdgeById(edgeEvent.getTenantId(), edgeEvent.getEdgeId());
isRoot = edge.getRootRuleChainId().equals(ruleChainId);
}
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
RuleChainUpdateMsg ruleChainUpdateMsg = ((RuleChainMsgConstructor)
ruleChainMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion))

View File

@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

View File

@ -711,7 +711,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
Assert.assertTrue(edgeImitator.waitForResponses());
Assert.assertTrue(onUpdateCallback.getSubscribeLatch().await(30, TimeUnit.SECONDS));
Assert.assertTrue(onUpdateCallback.getSubscribeLatch().await(TIMEOUT, TimeUnit.SECONDS));
Assert.assertEquals(JacksonUtil.newObjectNode().put(attrKey, attrValue),
JacksonUtil.fromBytes(onUpdateCallback.getPayloadBytes()));
@ -798,7 +798,21 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
// clean up stored edge events
edgeEventService.cleanupEvents(1);
// perform rpc call to verify edgeId in DeviceActorMessageProcessor updated properly
// edge is disconnected: perform rpc call - no edge event saved
doPostAsync(
"/api/rpc/oneway/" + device.getId().getId().toString(),
JacksonUtil.toString(createDefaultRpc()),
String.class,
status().isOk());
Awaitility.await()
.atMost(TIMEOUT, TimeUnit.SECONDS)
.until(() -> {
PageData<EdgeEvent> result = edgeEventService.findEdgeEvents(tenantId, tmpEdge.getId(), 0L, null, new TimePageLink(1));
return result.getTotalElements() == 0;
});
// edge is connected: perform rpc call to verify edgeId in DeviceActorMessageProcessor updated properly
simulateEdgeActivation(tmpEdge);
doPostAsync(
"/api/rpc/oneway/" + device.getId().getId().toString(),
JacksonUtil.toString(createDefaultRpc()),
@ -857,4 +871,23 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
return rpc;
}
private void simulateEdgeActivation(Edge edge) throws Exception {
ObjectNode attributes = JacksonUtil.newObjectNode();
attributes.put("active", true);
doPost("/api/plugins/telemetry/EDGE/" + edge.getId() + "/attributes/" + DataConstants.SERVER_SCOPE, attributes);
Awaitility.await()
.atMost(TIMEOUT, TimeUnit.SECONDS)
.until(() -> {
List<Map<String, Object>> values = doGetAsyncTyped("/api/plugins/telemetry/EDGE/" + edge.getId() +
"/values/attributes/SERVER_SCOPE", new TypeReference<>() {});
Optional<Map<String, Object>> activeAttrOpt = values.stream().filter(att -> att.get("key").equals("active")).findFirst();
if (activeAttrOpt.isEmpty()) {
return false;
}
Map<String, Object> activeAttr = activeAttrOpt.get();
return "true".equals(activeAttr.get("value").toString());
});
}
}

View File

@ -146,7 +146,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest {
}
}
private void createRuleChainMetadata(RuleChain ruleChain) {
private RuleChainMetaData createRuleChainMetadata(RuleChain ruleChain) {
RuleChainMetaData ruleChainMetaData = new RuleChainMetaData();
ruleChainMetaData.setRuleChainId(ruleChain.getId());
@ -182,7 +182,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest {
ruleChainMetaData.addConnectionInfo(0, 2, "fail");
ruleChainMetaData.addConnectionInfo(1, 2, "success");
doPost("/api/ruleChain/metadata", ruleChainMetaData, RuleChainMetaData.class);
return doPost("/api/ruleChain/metadata", ruleChainMetaData, RuleChainMetaData.class);
}
@Test
@ -193,9 +193,10 @@ public class RuleChainEdgeTest extends AbstractEdgeTest {
ruleChain.setType(RuleChainType.EDGE);
RuleChain savedRuleChain = doPost("/api/ruleChain", ruleChain, RuleChain.class);
edgeImitator.expectMessageAmount(1);
edgeImitator.expectMessageAmount(2);
doPost("/api/edge/" + edge.getUuidId()
+ "/ruleChain/" + savedRuleChain.getUuidId(), RuleChain.class);
RuleChainMetaData metaData = createRuleChainMetadata(savedRuleChain);
Assert.assertTrue(edgeImitator.waitForMessages());
// set new rule chain as root
@ -213,6 +214,22 @@ public class RuleChainEdgeTest extends AbstractEdgeTest {
Assert.assertTrue(ruleChainMsg.isRoot());
Assert.assertEquals(savedRuleChain.getId(), ruleChainMsg.getId());
// update metadata for root rule chain
edgeImitator.expectMessageAmount(1);
metaData.getNodes().forEach(n -> n.setDebugMode(true));
doPost("/api/ruleChain/metadata", metaData, RuleChainMetaData.class);
Assert.assertTrue(edgeImitator.waitForMessages());
ruleChainUpdateMsgOpt = edgeImitator.findMessageByType(RuleChainUpdateMsg.class);
Assert.assertTrue(ruleChainUpdateMsgOpt.isPresent());
ruleChainUpdateMsg = ruleChainUpdateMsgOpt.get();
ruleChainMsg = JacksonUtil.fromString(ruleChainUpdateMsg.getEntity(), RuleChain.class, true);
Assert.assertNotNull(ruleChainMsg);
Assert.assertTrue(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(ruleChainUpdateMsg.getMsgType()) ||
UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE.equals(ruleChainUpdateMsg.getMsgType()));
Assert.assertEquals(savedRuleChain.getId(), ruleChainMsg.getId());
Assert.assertEquals(savedRuleChain.getName(), ruleChainMsg.getName());
Assert.assertTrue(ruleChainMsg.isRoot());
// revert root rule chain
edgeImitator.expectMessageAmount(1);
doPost("/api/edge/" + edge.getUuidId()

View File

@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.thingsboard.edge.rpc.EdgeGrpcClient;
import org.thingsboard.edge.rpc.EdgeRpcClient;
import org.thingsboard.server.controller.AbstractWebTest;
import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
@ -72,8 +73,6 @@ import java.util.stream.Collectors;
@Slf4j
public class EdgeImitator {
public static final int TIMEOUT_IN_SECONDS = 30;
private String routingKey;
private String routingSecret;
@ -344,7 +343,7 @@ public class EdgeImitator {
}
public boolean waitForMessages() throws InterruptedException {
return waitForMessages(TIMEOUT_IN_SECONDS);
return waitForMessages(AbstractWebTest.TIMEOUT);
}
public boolean waitForMessages(int timeoutInSeconds) throws InterruptedException {
@ -359,7 +358,7 @@ public class EdgeImitator {
}
public boolean waitForResponses() throws InterruptedException {
return responsesLatch.await(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
return responsesLatch.await(AbstractWebTest.TIMEOUT, TimeUnit.SECONDS);
}
public void expectResponsesAmount(int messageAmount) {

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.service.edge.rpc.constructor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
@ -61,7 +60,7 @@ public class RuleChainMsgConstructorTest {
}
@Test
public void testConstructRuleChainMetadataUpdatedMsg_V_3_4_0() throws JsonProcessingException {
public void testConstructRuleChainMetadataUpdatedMsg_V_3_4_0() {
RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID());
RuleChainMetaData ruleChainMetaData = createRuleChainMetaData(
ruleChainId, 3, createRuleNodes(ruleChainId), createConnections());
@ -80,7 +79,7 @@ public class RuleChainMsgConstructorTest {
}
@Test
public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_3() throws JsonProcessingException {
public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_3() {
RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID());
RuleChainMetaData ruleChainMetaData = createRuleChainMetaData(
ruleChainId, 3, createRuleNodes(ruleChainId), createConnections());
@ -120,7 +119,7 @@ public class RuleChainMsgConstructorTest {
}
@Test
public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0() throws JsonProcessingException {
public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0() {
RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID());
RuleChainMetaData ruleChainMetaData = createRuleChainMetaData(ruleChainId, 3, createRuleNodes(ruleChainId), createConnections());
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
@ -161,7 +160,7 @@ public class RuleChainMsgConstructorTest {
}
@Test
public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0_inDifferentOrder() throws JsonProcessingException {
public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0_inDifferentOrder() {
// same rule chain metadata, but different order of rule nodes
RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID());
RuleChainMetaData ruleChainMetaData1 = createRuleChainMetaData(ruleChainId, 8, createRuleNodesInDifferentOrder(ruleChainId), createConnectionsInDifferentOrder());
@ -254,7 +253,7 @@ public class RuleChainMsgConstructorTest {
return result;
}
private List<RuleNode> createRuleNodes(RuleChainId ruleChainId) throws JsonProcessingException {
private List<RuleNode> createRuleNodes(RuleChainId ruleChainId) {
List<RuleNode> result = new ArrayList<>();
result.add(getOutputNode(ruleChainId));
result.add(getAcknowledgeNode(ruleChainId));
@ -301,7 +300,7 @@ public class RuleChainMsgConstructorTest {
return result;
}
private List<RuleNode> createRuleNodesInDifferentOrder(RuleChainId ruleChainId) throws JsonProcessingException {
private List<RuleNode> createRuleNodesInDifferentOrder(RuleChainId ruleChainId) {
List<RuleNode> result = new ArrayList<>();
result.add(getPushToAnalyticsNode(ruleChainId));
result.add(getPushToCloudNode(ruleChainId));
@ -319,99 +318,99 @@ public class RuleChainMsgConstructorTest {
}
private RuleNode getOutputNode(RuleChainId ruleChainId) throws JsonProcessingException {
private RuleNode getOutputNode(RuleChainId ruleChainId) {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.flow.TbRuleChainOutputNode",
"Output node",
JacksonUtil.OBJECT_MAPPER.readTree("{\"version\":0}"),
JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"\",\"layoutX\":178,\"layoutY\":592}"));
JacksonUtil.toJsonNode("{\"version\":0}"),
JacksonUtil.toJsonNode("{\"description\":\"\",\"layoutX\":178,\"layoutY\":592}"));
}
private RuleNode getCheckpointNode(RuleChainId ruleChainId) throws JsonProcessingException {
private RuleNode getCheckpointNode(RuleChainId ruleChainId) {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.flow.TbCheckpointNode",
"Checkpoint node",
JacksonUtil.OBJECT_MAPPER.readTree("{\"queueName\":\"HighPriority\"}"),
JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"\",\"layoutX\":178,\"layoutY\":647}"));
JacksonUtil.toJsonNode("{\"queueName\":\"HighPriority\"}"),
JacksonUtil.toJsonNode("{\"description\":\"\",\"layoutX\":178,\"layoutY\":647}"));
}
private RuleNode getSaveTimeSeriesNode(RuleChainId ruleChainId) throws JsonProcessingException {
private RuleNode getSaveTimeSeriesNode(RuleChainId ruleChainId) {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"Save Timeseries",
JacksonUtil.OBJECT_MAPPER.readTree("{\"defaultTTL\":0}"),
JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":823,\"layoutY\":157}"));
JacksonUtil.toJsonNode("{\"defaultTTL\":0}"),
JacksonUtil.toJsonNode("{\"layoutX\":823,\"layoutY\":157}"));
}
private RuleNode getMessageTypeSwitchNode(RuleChainId ruleChainId) throws JsonProcessingException {
private RuleNode getMessageTypeSwitchNode(RuleChainId ruleChainId) {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
"Message Type Switch",
JacksonUtil.OBJECT_MAPPER.readTree("{\"version\":0}"),
JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":347,\"layoutY\":149}"));
JacksonUtil.toJsonNode("{\"version\":0}"),
JacksonUtil.toJsonNode("{\"layoutX\":347,\"layoutY\":149}"));
}
private RuleNode getLogOtherNode(RuleChainId ruleChainId) throws JsonProcessingException {
private RuleNode getLogOtherNode(RuleChainId ruleChainId) {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.action.TbLogNode",
"Log Other",
JacksonUtil.OBJECT_MAPPER.readTree("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"),
JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":824,\"layoutY\":378}"));
JacksonUtil.toJsonNode("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"),
JacksonUtil.toJsonNode("{\"layoutX\":824,\"layoutY\":378}"));
}
private RuleNode getPushToCloudNode(RuleChainId ruleChainId) throws JsonProcessingException {
private RuleNode getPushToCloudNode(RuleChainId ruleChainId) {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode",
"Push to cloud",
JacksonUtil.OBJECT_MAPPER.readTree("{\"scope\":\"SERVER_SCOPE\"}"),
JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":1129,\"layoutY\":52}"));
JacksonUtil.toJsonNode("{\"scope\":\"SERVER_SCOPE\"}"),
JacksonUtil.toJsonNode("{\"layoutX\":1129,\"layoutY\":52}"));
}
private RuleNode getAcknowledgeNode(RuleChainId ruleChainId) throws JsonProcessingException {
private RuleNode getAcknowledgeNode(RuleChainId ruleChainId) {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.flow.TbAckNode",
"Acknowledge node",
JacksonUtil.OBJECT_MAPPER.readTree("{\"version\":0}"),
JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"\",\"layoutX\":177,\"layoutY\":703}"));
JacksonUtil.toJsonNode("{\"version\":0}"),
JacksonUtil.toJsonNode("{\"description\":\"\",\"layoutX\":177,\"layoutY\":703}"));
}
private RuleNode getDeviceProfileNode(RuleChainId ruleChainId) throws JsonProcessingException {
private RuleNode getDeviceProfileNode(RuleChainId ruleChainId) {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.profile.TbDeviceProfileNode",
"Device Profile Node",
JacksonUtil.OBJECT_MAPPER.readTree("{\"persistAlarmRulesState\":false,\"fetchAlarmRulesStateOnStart\":false}"),
JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \\\"Success\\\" relation type.\",\"layoutX\":187,\"layoutY\":468}"));
JacksonUtil.toJsonNode("{\"persistAlarmRulesState\":false,\"fetchAlarmRulesStateOnStart\":false}"),
JacksonUtil.toJsonNode("{\"description\":\"Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \\\"Success\\\" relation type.\",\"layoutX\":187,\"layoutY\":468}"));
}
private RuleNode getSaveClientAttributesNode(RuleChainId ruleChainId) throws JsonProcessingException {
private RuleNode getSaveClientAttributesNode(RuleChainId ruleChainId) {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode",
"Save Client Attributes",
JacksonUtil.OBJECT_MAPPER.readTree("{\"scope\":\"CLIENT_SCOPE\"}"),
JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":824,\"layoutY\":52}"));
JacksonUtil.toJsonNode("{\"scope\":\"CLIENT_SCOPE\"}"),
JacksonUtil.toJsonNode("{\"layoutX\":824,\"layoutY\":52}"));
}
private RuleNode getLogRpcFromDeviceNode(RuleChainId ruleChainId) throws JsonProcessingException {
private RuleNode getLogRpcFromDeviceNode(RuleChainId ruleChainId) {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.action.TbLogNode",
"Log RPC from Device",
JacksonUtil.OBJECT_MAPPER.readTree("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"),
JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":825,\"layoutY\":266}"));
JacksonUtil.toJsonNode("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"),
JacksonUtil.toJsonNode("{\"layoutX\":825,\"layoutY\":266}"));
}
private RuleNode getRpcCallRequestNode(RuleChainId ruleChainId) throws JsonProcessingException {
private RuleNode getRpcCallRequestNode(RuleChainId ruleChainId) {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode",
"RPC Call Request",
JacksonUtil.OBJECT_MAPPER.readTree("{\"timeoutInSeconds\":60}"),
JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":824,\"layoutY\":466}"));
JacksonUtil.toJsonNode("{\"timeoutInSeconds\":60}"),
JacksonUtil.toJsonNode("{\"layoutX\":824,\"layoutY\":466}"));
}
private RuleNode getPushToAnalyticsNode(RuleChainId ruleChainId) throws JsonProcessingException {
private RuleNode getPushToAnalyticsNode(RuleChainId ruleChainId) {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.flow.TbRuleChainInputNode",
"Push to Analytics",
JacksonUtil.OBJECT_MAPPER.readTree("{\"ruleChainId\":\"af588000-6c7c-11ec-bafd-c9a47a5c8d99\"}"),
JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"\",\"layoutX\":477,\"layoutY\":560}"));
JacksonUtil.toJsonNode("{\"ruleChainId\":\"af588000-6c7c-11ec-bafd-c9a47a5c8d99\"}"),
JacksonUtil.toJsonNode("{\"description\":\"\",\"layoutX\":477,\"layoutY\":560}"));
}
}

View File

@ -55,6 +55,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
@ -209,6 +210,9 @@ public abstract class BaseEdgeProcessorTest {
@MockBean
protected AttributesService attributesService;
@MockBean
protected TimeseriesService timeseriesService;
@MockBean
protected TbClusterService tbClusterService;

View File

@ -91,4 +91,6 @@ public interface EdgeService extends EntityDaoService {
PageData<EdgeId> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink);
String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId, String tbRuleChainInputNodeClassName);
ListenableFuture<Boolean> isEdgeActiveAsync(TenantId tenantId, EdgeId edgeId, String activityState);
}

View File

@ -30,10 +30,10 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionalEventListener;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.edge.Edge;
@ -48,14 +48,15 @@ import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.AbstractCachedEntityService;
import org.thingsboard.server.dao.eventsourcing.ActionEntityEvent;
import org.thingsboard.server.dao.exception.DataValidationException;
@ -64,7 +65,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.service.PaginatedRemover;
import org.thingsboard.server.dao.service.Validator;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
import javax.annotation.Nullable;
@ -105,7 +106,10 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
private RelationService relationService;
@Autowired
private TenantService tenantService;
private TimeseriesService timeseriesService;
@Autowired
private AttributesService attributesService;
@Autowired
private DataValidator<Edge> edgeValidator;
@ -113,6 +117,8 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
@Value("${edges.enabled}")
@Getter
private boolean edgesEnabled;
@Value("${edges.state.persistToTelemetry:false}")
private boolean persistToTelemetry;
@TransactionalEventListener(classes = EdgeCacheEvictEvent.class)
@Override
@ -530,6 +536,18 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
return result.toString();
}
@Override
public ListenableFuture<Boolean> isEdgeActiveAsync(TenantId tenantId, EdgeId edgeId, String key) {
ListenableFuture<? extends Optional<? extends KvEntry>> futureKvEntry;
if (persistToTelemetry) {
futureKvEntry = timeseriesService.findLatest(tenantId, edgeId, key);
} else {
futureKvEntry = attributesService.find(tenantId, edgeId, DataConstants.SERVER_SCOPE, key);
}
return Futures.transformAsync(futureKvEntry, kvEntryOpt ->
Futures.immediateFuture(kvEntryOpt.flatMap(KvEntry::getBooleanValue).orElse(false)), MoreExecutors.directExecutor());
}
private List<RuleChain> findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) {
List<RuleChain> result = new ArrayList<>();
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);

View File

@ -40,9 +40,9 @@ import org.thingsboard.server.common.data.notification.rule.trigger.config.Alarm
import org.thingsboard.server.common.data.notification.rule.trigger.config.ApiUsageLimitNotificationRuleTriggerConfig;
import org.thingsboard.server.common.data.notification.rule.trigger.config.DeviceActivityNotificationRuleTriggerConfig;
import org.thingsboard.server.common.data.notification.rule.trigger.config.DeviceActivityNotificationRuleTriggerConfig.DeviceEvent;
import org.thingsboard.server.common.data.notification.rule.trigger.config.EdgeCommunicationFailureNotificationRuleTriggerConfig;
import org.thingsboard.server.common.data.notification.rule.trigger.config.EdgeConnectionNotificationRuleTriggerConfig;
import org.thingsboard.server.common.data.notification.rule.trigger.config.EdgeConnectionNotificationRuleTriggerConfig.EdgeConnectivityEvent;
import org.thingsboard.server.common.data.notification.rule.trigger.config.EdgeCommunicationFailureNotificationRuleTriggerConfig;
import org.thingsboard.server.common.data.notification.rule.trigger.config.EntitiesLimitNotificationRuleTriggerConfig;
import org.thingsboard.server.common.data.notification.rule.trigger.config.EntityActionNotificationRuleTriggerConfig;
import org.thingsboard.server.common.data.notification.rule.trigger.config.NewPlatformVersionNotificationRuleTriggerConfig;
@ -347,7 +347,7 @@ public class DefaultNotifications {
public static final DefaultNotification edgeCommunicationFailures = DefaultNotification.builder()
.name("Edge communication failure notification")
.type(NotificationType.EDGE_COMMUNICATION_FAILURE)
.subject("Edge '${edgeName}' communication failure occured")
.subject("Edge '${edgeName}' communication failure occurred")
.text("Failure message: '${failureMsg}'")
.icon("error").color(RED_COLOR)
.button("Go to Edge").link("/edgeManagement/instances/${edgeId}")

View File

@ -525,12 +525,11 @@ public class RestClient implements Closeable {
}
public PageData<AlarmCommentInfo> getAlarmComments(AlarmId alarmId, PageLink pageLink) {
String urlSecondPart = "/api/alarm/{alarmId}/comment";
Map<String, String> params = new HashMap<>();
params.put("alarmId", alarmId.getId().toString());
addPageLinkToParam(params, pageLink);
return restTemplate.exchange(
baseURL + urlSecondPart + "&" + getUrlParams(pageLink),
baseURL + "/api/alarm/{alarmId}/comment?" + getUrlParams(pageLink),
HttpMethod.GET,
HttpEntity.EMPTY,
new ParameterizedTypeReference<PageData<AlarmCommentInfo>>() {

View File

@ -47,6 +47,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.INACTIVITY_EVENT;
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST;
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST;
import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_UPDATED;
import static org.thingsboard.server.common.data.msg.TbMsgType.TO_SERVER_RPC_REQUEST;
@Slf4j
public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfiguration, S, U> implements TbNode {
@ -176,6 +177,6 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
protected boolean isSupportedMsgType(TbMsg msg) {
return msg.isTypeOneOf(POST_TELEMETRY_REQUEST, POST_ATTRIBUTES_REQUEST, ATTRIBUTES_UPDATED, ATTRIBUTES_DELETED, TIMESERIES_UPDATED,
ALARM, CONNECT_EVENT, DISCONNECT_EVENT, ACTIVITY_EVENT, INACTIVITY_EVENT);
ALARM, CONNECT_EVENT, DISCONNECT_EVENT, ACTIVITY_EVENT, INACTIVITY_EVENT, TO_SERVER_RPC_REQUEST);
}
}

View File

@ -15,6 +15,8 @@
*/
package org.thingsboard.rule.engine.geo;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
@ -28,6 +30,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.Collections;
@ -39,6 +42,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.ENTERED;
import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.INSIDE;
import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.LEFT;
import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.OUTSIDE;
/**
* Created by ashvayka on 19.01.18.
*/
@ -46,15 +54,24 @@ import java.util.concurrent.TimeoutException;
@RuleNode(
type = ComponentType.ACTION,
name = "gps geofencing events",
version = 1,
configClazz = TbGpsGeofencingActionNodeConfiguration.class,
relationTypes = {"Success", "Entered", "Left", "Inside", "Outside"},
nodeDescription = "Produces incoming messages using GPS based geofencing",
nodeDetails = "Extracts latitude and longitude parameters from incoming message and returns different events based on configuration parameters",
nodeDetails = "Extracts latitude and longitude parameters from incoming message and returns different events based on configuration parameters. " +
"<br><br>" +
"If an object with coordinates extracted from incoming message enters the geofence, sends a message with the type <code>Entered</code>. " +
"If an object leaves the geofence, sends a message with the type <code>Left</code>. " +
"If the presence monitoring strategy <b>\"On first message\"</b> is selected, sends messages via rule node connection type <code>Inside</code> or <code>Outside</code> only the first time the geofencing and duration conditions are satisfied; otherwise sends messages via rule node connection type <code>Success</code>. " +
"If the presence monitoring strategy <b>\"On each message\"</b> is selected, sends messages via rule node connection type <code>Inside</code> or <code>Outside</code> every time the geofencing condition is satisfied. " +
"<br><br>" +
"Output connections: <code>Entered</code>, <code>Left</code>, <code>Inside</code>, <code>Outside</code>, <code>Success</code>",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeGpsGeofencingConfig"
)
public class TbGpsGeofencingActionNode extends AbstractGeofencingNode<TbGpsGeofencingActionNodeConfiguration> {
private static final String REPORT_PRESENCE_STATUS_ON_EACH_MESSAGE = "reportPresenceStatusOnEachMessage";
private final Map<EntityId, EntityGeofencingState> entityStates = new HashMap<>();
private final Gson gson = new Gson();
private final JsonParser parser = new JsonParser();
@ -80,25 +97,32 @@ public class TbGpsGeofencingActionNode extends AbstractGeofencingNode<TbGpsGeofe
}
});
boolean told = false;
if (entityState.getStateSwitchTime() == 0L || entityState.isInside() != matches) {
switchState(ctx, msg.getOriginator(), entityState, matches, ts);
ctx.tellNext(msg, matches ? "Entered" : "Left");
told = true;
} else {
if (!entityState.isStayed()) {
long stayTime = ts - entityState.getStateSwitchTime();
if (stayTime > (entityState.isInside() ?
TimeUnit.valueOf(config.getMinInsideDurationTimeUnit()).toMillis(config.getMinInsideDuration()) : TimeUnit.valueOf(config.getMinOutsideDurationTimeUnit()).toMillis(config.getMinOutsideDuration()))) {
setStaid(ctx, msg.getOriginator(), entityState);
ctx.tellNext(msg, entityState.isInside() ? "Inside" : "Outside");
told = true;
}
}
ctx.tellNext(msg, matches ? ENTERED : LEFT);
return;
}
if (!told) {
if (config.isReportPresenceStatusOnEachMessage()) {
ctx.tellNext(msg, entityState.isInside() ? INSIDE : OUTSIDE);
return;
}
if (entityState.isStayed()) {
ctx.tellSuccess(msg);
return;
}
long stayTime = ts - entityState.getStateSwitchTime();
if (stayTime > (entityState.isInside() ?
TimeUnit.valueOf(config.getMinInsideDurationTimeUnit()).toMillis(config.getMinInsideDuration()) :
TimeUnit.valueOf(config.getMinOutsideDurationTimeUnit()).toMillis(config.getMinOutsideDuration()))) {
setStaid(ctx, msg.getOriginator(), entityState);
ctx.tellNext(msg, entityState.isInside() ? INSIDE : OUTSIDE);
return;
}
ctx.tellSuccess(msg);
}
private void switchState(TbContext ctx, EntityId entityId, EntityGeofencingState entityState, boolean matches, long ts) {
@ -127,4 +151,17 @@ public class TbGpsGeofencingActionNode extends AbstractGeofencingNode<TbGpsGeofe
protected Class<TbGpsGeofencingActionNodeConfiguration> getConfigClazz() {
return TbGpsGeofencingActionNodeConfiguration.class;
}
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
if (fromVersion == 0) {
if (!oldConfiguration.has(REPORT_PRESENCE_STATUS_ON_EACH_MESSAGE)) {
hasChanges = true;
((ObjectNode) oldConfiguration).put(REPORT_PRESENCE_STATUS_ON_EACH_MESSAGE, false);
}
}
return new TbPair<>(hasChanges, oldConfiguration);
}
}

View File

@ -31,6 +31,8 @@ public class TbGpsGeofencingActionNodeConfiguration extends TbGpsGeofencingFilte
private String minInsideDurationTimeUnit;
private String minOutsideDurationTimeUnit;
private boolean reportPresenceStatusOnEachMessage;
@Override
public TbGpsGeofencingActionNodeConfiguration defaultConfiguration() {
TbGpsGeofencingActionNodeConfiguration configuration = new TbGpsGeofencingActionNodeConfiguration();
@ -43,6 +45,7 @@ public class TbGpsGeofencingActionNodeConfiguration extends TbGpsGeofencingFilte
configuration.setMinOutsideDurationTimeUnit(TimeUnit.MINUTES.name());
configuration.setMinInsideDuration(1);
configuration.setMinOutsideDuration(1);
configuration.setReportPresenceStatusOnEachMessage(true);
return configuration;
}
}

View File

@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.RuleChainId;
@ -63,6 +64,9 @@ public class EntitiesFieldsAsyncLoader {
case ENTITY_VIEW:
return toEntityFieldsDataAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), (EntityViewId) originatorId),
EntityFieldsData::new, ctx);
case EDGE:
return toEntityFieldsDataAsync(ctx.getEdgeService().findEdgeByIdAsync(ctx.getTenantId(), (EdgeId) originatorId),
EntityFieldsData::new, ctx);
default:
return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType: " + originatorId.getEntityType()));
}

View File

@ -0,0 +1,23 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.util;
public class GpsGeofencingEvents {
public static final String ENTERED = "Entered";
public static final String INSIDE = "Inside";
public static final String LEFT = "Left";
public static final String OUTSIDE = "Outside";
}

View File

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.geo;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import java.util.HashMap;
import java.util.Map;
@Data
public class GpsGeofencingActionTestCase {
private EntityId entityId;
private Map<EntityId, EntityGeofencingState> entityStates;
private boolean msgInside;
private boolean reportPresenceStatusOnEachMessage;
public GpsGeofencingActionTestCase(EntityId entityId, boolean msgInside, boolean reportPresenceStatusOnEachMessage, EntityGeofencingState entityGeofencingState) {
this.entityId = entityId;
this.msgInside = msgInside;
this.reportPresenceStatusOnEachMessage = reportPresenceStatusOnEachMessage;
this.entityStates = new HashMap<>();
this.entityStates.put(entityId, entityGeofencingState);
}
}

View File

@ -0,0 +1,259 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.geo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.attributes.AttributesService;
import java.time.Duration;
import java.util.UUID;
import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.ENTERED;
import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.INSIDE;
import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.LEFT;
import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.OUTSIDE;
import static org.thingsboard.server.common.data.msg.TbNodeConnectionType.SUCCESS;
@ExtendWith(MockitoExtension.class)
class TbGpsGeofencingActionNodeTest extends AbstractRuleNodeUpgradeTest {
@Mock
private TbContext ctx;
@Mock
private AttributesService attributesService;
private TbGpsGeofencingActionNode node;
@BeforeEach
void setUp() {
node = spy(new TbGpsGeofencingActionNode());
}
@AfterEach
void tearDown() {
node.destroy();
}
private static Stream<Arguments> givenReportPresenceStatusOnEachMessage_whenOnMsg_thenVerifyOutputMsgType() {
DeviceId deviceId = new DeviceId(UUID.randomUUID());
long tsNow = System.currentTimeMillis();
long tsNowMinusMinuteAndMillis = tsNow - Duration.ofMinutes(1).plusMillis(1).toMillis();
return Stream.of(
// default config with presenceMonitoringStrategyOnEachMessage false and msgInside true
Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false,
new EntityGeofencingState(false, 0, false)), ENTERED),
Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false,
new EntityGeofencingState(true, tsNow, false)), SUCCESS),
Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false,
new EntityGeofencingState(true, tsNowMinusMinuteAndMillis, false)), INSIDE),
Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false,
new EntityGeofencingState(true, tsNow, true)), SUCCESS),
// default config with presenceMonitoringStrategyOnEachMessage false and msgInside false
Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false,
new EntityGeofencingState(false, 0, false)), LEFT),
Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false,
new EntityGeofencingState(false, tsNow, false)), SUCCESS),
Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false,
new EntityGeofencingState(false, tsNowMinusMinuteAndMillis, false)), OUTSIDE),
Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false,
new EntityGeofencingState(false, tsNow, true)), SUCCESS),
// default config with presenceMonitoringStrategyOnEachMessage true and msgInside true
Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true,
new EntityGeofencingState(false, 0, false)), ENTERED),
Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true,
new EntityGeofencingState(true, tsNow, false)), INSIDE),
Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true,
new EntityGeofencingState(true, tsNowMinusMinuteAndMillis, false)), INSIDE),
// default config with presenceMonitoringStrategyOnEachMessage true and msgInside false
Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true,
new EntityGeofencingState(false, 0, false)), LEFT),
Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true,
new EntityGeofencingState(false, tsNow, false)), OUTSIDE),
Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true,
new EntityGeofencingState(false, tsNowMinusMinuteAndMillis, false)), OUTSIDE)
);
}
@ParameterizedTest
@MethodSource
void givenReportPresenceStatusOnEachMessage_whenOnMsg_thenVerifyOutputMsgType(
GpsGeofencingActionTestCase gpsGeofencingActionTestCase,
String expectedOutput
) throws TbNodeException {
// GIVEN
var config = new TbGpsGeofencingActionNodeConfiguration().defaultConfiguration();
config.setReportPresenceStatusOnEachMessage(gpsGeofencingActionTestCase.isReportPresenceStatusOnEachMessage());
node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsg msg = gpsGeofencingActionTestCase.isMsgInside() ?
getInsideRectangleTbMsg(gpsGeofencingActionTestCase.getEntityId()) :
getOutsideRectangleTbMsg(gpsGeofencingActionTestCase.getEntityId());
when(ctx.getAttributesService()).thenReturn(attributesService);
ReflectionTestUtils.setField(node, "entityStates", gpsGeofencingActionTestCase.getEntityStates());
// WHEN
node.onMsg(ctx, msg);
// THEN
verify(ctx.getAttributesService(), never()).find(any(), any(), any(), anyString());
verify(ctx, never()).tellFailure(any(), any(Throwable.class));
verify(ctx, never()).enqueueForTellNext(any(), eq(expectedOutput), any(), any());
verify(ctx, never()).ack(any());
if (SUCCESS.equals(expectedOutput)) {
verify(ctx).tellSuccess(eq(msg));
} else {
verify(ctx).tellNext(eq(msg), eq(expectedOutput));
}
}
private TbMsg getOutsideRectangleTbMsg(EntityId entityId) {
return getTbMsg(entityId, getMetadataForNewVersionPolygonPerimeter(),
GeoUtilTest.POINT_OUTSIDE_SIMPLE_RECT.getLatitude(),
GeoUtilTest.POINT_OUTSIDE_SIMPLE_RECT.getLongitude());
}
private TbMsg getInsideRectangleTbMsg(EntityId entityId) {
return getTbMsg(entityId, getMetadataForNewVersionPolygonPerimeter(),
GeoUtilTest.POINT_INSIDE_SIMPLE_RECT_CENTER.getLatitude(),
GeoUtilTest.POINT_INSIDE_SIMPLE_RECT_CENTER.getLongitude());
}
private TbMsg getTbMsg(EntityId entityId, TbMsgMetaData metadata, double latitude, double longitude) {
String data = "{\"latitude\": " + latitude + ", \"longitude\": " + longitude + "}";
return TbMsg.newMsg(TbMsgType.POST_ATTRIBUTES_REQUEST, entityId, metadata, data);
}
private TbMsgMetaData getMetadataForNewVersionPolygonPerimeter() {
var metadata = new TbMsgMetaData();
metadata.putValue("ss_perimeter", GeoUtilTest.SIMPLE_RECT);
return metadata;
}
// Rule nodes upgrade
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
// default config for version 0
Arguments.of(0,
"{\n" +
" \"minInsideDuration\": 1,\n" +
" \"minOutsideDuration\": 1,\n" +
" \"minInsideDurationTimeUnit\": \"MINUTES\",\n" +
" \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" +
" \"latitudeKeyName\": \"latitude\",\n" +
" \"longitudeKeyName\": \"longitude\",\n" +
" \"perimeterType\": \"POLYGON\",\n" +
" \"fetchPerimeterInfoFromMessageMetadata\": true,\n" +
" \"perimeterKeyName\": \"ss_perimeter\",\n" +
" \"polygonsDefinition\": null,\n" +
" \"centerLatitude\": null,\n" +
" \"centerLongitude\": null,\n" +
" \"range\": null,\n" +
" \"rangeUnit\": null\n" +
"}\n",
true,
"{\n" +
" \"minInsideDuration\": 1,\n" +
" \"minOutsideDuration\": 1,\n" +
" \"minInsideDurationTimeUnit\": \"MINUTES\",\n" +
" \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" +
" \"reportPresenceStatusOnEachMessage\": false,\n" +
" \"latitudeKeyName\": \"latitude\",\n" +
" \"longitudeKeyName\": \"longitude\",\n" +
" \"perimeterType\": \"POLYGON\",\n" +
" \"fetchPerimeterInfoFromMessageMetadata\": true,\n" +
" \"perimeterKeyName\": \"ss_perimeter\",\n" +
" \"polygonsDefinition\": null,\n" +
" \"centerLatitude\": null,\n" +
" \"centerLongitude\": null,\n" +
" \"range\": null,\n" +
" \"rangeUnit\": null\n" +
"}\n"),
// default config for version 1 with upgrade from version 0
Arguments.of(0,
"{\n" +
" \"minInsideDuration\": 1,\n" +
" \"minOutsideDuration\": 1,\n" +
" \"minInsideDurationTimeUnit\": \"MINUTES\",\n" +
" \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" +
" \"reportPresenceStatusOnEachMessage\": false,\n" +
" \"latitudeKeyName\": \"latitude\",\n" +
" \"longitudeKeyName\": \"longitude\",\n" +
" \"perimeterType\": \"POLYGON\",\n" +
" \"fetchPerimeterInfoFromMessageMetadata\": true,\n" +
" \"perimeterKeyName\": \"ss_perimeter\",\n" +
" \"polygonsDefinition\": null,\n" +
" \"centerLatitude\": null,\n" +
" \"centerLongitude\": null,\n" +
" \"range\": null,\n" +
" \"rangeUnit\": null\n" +
"}\n",
false,
"{\n" +
" \"minInsideDuration\": 1,\n" +
" \"minOutsideDuration\": 1,\n" +
" \"minInsideDurationTimeUnit\": \"MINUTES\",\n" +
" \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" +
" \"reportPresenceStatusOnEachMessage\": false,\n" +
" \"latitudeKeyName\": \"latitude\",\n" +
" \"longitudeKeyName\": \"longitude\",\n" +
" \"perimeterType\": \"POLYGON\",\n" +
" \"fetchPerimeterInfoFromMessageMetadata\": true,\n" +
" \"perimeterKeyName\": \"ss_perimeter\",\n" +
" \"polygonsDefinition\": null,\n" +
" \"centerLatitude\": null,\n" +
" \"centerLongitude\": null,\n" +
" \"range\": null,\n" +
" \"rangeUnit\": null\n" +
"}\n")
);
}
@Override
protected TbNode getTestNode() {
return node;
}
}

View File

@ -37,10 +37,12 @@ import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
@ -52,6 +54,7 @@ import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.tenant.TenantService;
@ -95,6 +98,8 @@ public class EntitiesFieldsAsyncLoaderTest {
private RuleChainService ruleChainServiceMock;
@Mock
private EntityViewService entityViewServiceMock;
@Mock
private EdgeService edgeServiceMock;
@BeforeAll
public static void setup() {
@ -108,7 +113,8 @@ public class EntitiesFieldsAsyncLoaderTest {
EntityType.DEVICE,
EntityType.ALARM,
EntityType.RULE_CHAIN,
EntityType.ENTITY_VIEW
EntityType.ENTITY_VIEW,
EntityType.EDGE
);
}
@ -228,6 +234,14 @@ public class EntitiesFieldsAsyncLoaderTest {
when(ctxMock.getEntityViewService()).thenReturn(entityViewServiceMock);
doReturn(entityView).when(entityViewServiceMock).findEntityViewByIdAsync(eq(TENANT_ID), any());
break;
case EDGE:
var edge = Futures.immediateFuture(entityDoesNotExist ? null : new Edge(new EdgeId(RANDOM_UUID)));
when(ctxMock.getDbCallbackExecutor()).thenReturn(DB_EXECUTOR);
when(ctxMock.getEdgeService()).thenReturn(edgeServiceMock);
doReturn(edge).when(edgeServiceMock).findEdgeByIdAsync(eq(TENANT_ID), any());
break;
default:
throw new RuntimeException("Unexpected EntityType: " + entityType);
@ -252,6 +266,8 @@ public class EntitiesFieldsAsyncLoaderTest {
return new RuleChain((RuleChainId) entityId);
case ENTITY_VIEW:
return new EntityView((EntityViewId) entityId);
case EDGE:
return new Edge((EdgeId) entityId);
default:
throw new RuntimeException("Unexpected EntityType: " + entityId.getEntityType());
}