diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 58bd82abb3..f1c0260124 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -66,7 +66,6 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNoti import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry; @@ -90,7 +89,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; +import org.thingsboard.server.gen.transport.TransportProtos.UplinkNotificationMsg; import org.thingsboard.server.service.rpc.RpcSubmitStrategy; +import org.thingsboard.server.service.state.DefaultDeviceStateService; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import javax.annotation.Nullable; @@ -173,7 +174,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private EdgeId findRelatedEdgeId() { List result = - systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON); + systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); if (result != null && result.size() > 0) { EntityRelation relationToEdge = result.get(0); if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) { @@ -212,8 +213,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (systemContext.isEdgesEnabled() && edgeId != null) { log.debug("[{}][{}] device is related to edge: [{}]. Saving RPC request: [{}][{}] to edge queue", tenantId, deviceId, edgeId.getId(), rpcId, requestId); try { - saveRpcRequestToEdgeQueue(request, requestId).get(); - sent = true; + if (systemContext.getEdgeService().isEdgeActiveAsync(tenantId, edgeId, DefaultDeviceStateService.ACTIVITY_STATE).get()) { + saveRpcRequestToEdgeQueue(request, requestId).get(); + } else { + log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}. The Edge is currently offline or unreachable", tenantId, deviceId, edgeId.getId(), request); + } } catch (InterruptedException | ExecutionException e) { log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}", tenantId, deviceId, edgeId.getId(), request, e); } @@ -470,7 +474,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso callback.onSuccess(); } - private void processUplinkNotificationMsg(SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg uplinkNotificationMsg) { + private void processUplinkNotificationMsg(SessionInfoProto sessionInfo, UplinkNotificationMsg uplinkNotificationMsg) { String nodeId = sessionInfo.getNodeId(); sessions.entrySet().stream() .filter(kv -> kv.getValue().getSessionInfo().getNodeId().equals(nodeId) && (kv.getValue().isSubscribedToAttributes() || kv.getValue().isSubscribedToRPC())) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index e14ed8203b..870ccda60a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -16,7 +16,6 @@ package org.thingsboard.server.actors.ruleChain; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorRef; @@ -29,6 +28,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; import org.thingsboard.server.common.data.relation.EntityRelation; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index c23fd98d8e..026ca51e8d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -477,6 +477,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i TbMsgMetaData md = new TbMsgMetaData(); if (!persistToTelemetry) { md.putValue(DataConstants.SCOPE, DataConstants.SERVER_SCOPE); + md.putValue("edgeName", edge.getName()); + md.putValue("edgeType", edge.getType()); } TbMsg tbMsg = TbMsg.newMsg(msgType, edgeId, md, TbMsgDataType.JSON, data); clusterService.pushMsgToRuleEngine(tenantId, edgeId, tbMsg, null); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java index 2a191dfaf4..118cc0ccd4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.processor.rule; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.rule.RuleChain; @@ -53,6 +54,10 @@ public class RuleChainEdgeProcessor extends BaseEdgeProcessor { isRoot = Boolean.parseBoolean(edgeEvent.getBody().get(EDGE_IS_ROOT_BODY_KEY).asText()); } catch (Exception ignored) {} } + if (!isRoot) { + Edge edge = edgeService.findEdgeById(edgeEvent.getTenantId(), edgeEvent.getEdgeId()); + isRoot = edge.getRootRuleChainId().equals(ruleChainId); + } UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); RuleChainUpdateMsg ruleChainUpdateMsg = ((RuleChainMsgConstructor) ruleChainMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java index d7ddd5d81b..1f9e85bdc7 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java @@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.RuleNodeInfo; import org.thingsboard.server.common.msg.queue.TbMsgCallback; -import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import java.util.UUID; import java.util.concurrent.TimeUnit; diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 488bd4ed70..458bb3240e 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -711,7 +711,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest { edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); Assert.assertTrue(edgeImitator.waitForResponses()); - Assert.assertTrue(onUpdateCallback.getSubscribeLatch().await(30, TimeUnit.SECONDS)); + Assert.assertTrue(onUpdateCallback.getSubscribeLatch().await(TIMEOUT, TimeUnit.SECONDS)); Assert.assertEquals(JacksonUtil.newObjectNode().put(attrKey, attrValue), JacksonUtil.fromBytes(onUpdateCallback.getPayloadBytes())); @@ -798,7 +798,21 @@ public class DeviceEdgeTest extends AbstractEdgeTest { // clean up stored edge events edgeEventService.cleanupEvents(1); - // perform rpc call to verify edgeId in DeviceActorMessageProcessor updated properly + // edge is disconnected: perform rpc call - no edge event saved + doPostAsync( + "/api/rpc/oneway/" + device.getId().getId().toString(), + JacksonUtil.toString(createDefaultRpc()), + String.class, + status().isOk()); + Awaitility.await() + .atMost(TIMEOUT, TimeUnit.SECONDS) + .until(() -> { + PageData result = edgeEventService.findEdgeEvents(tenantId, tmpEdge.getId(), 0L, null, new TimePageLink(1)); + return result.getTotalElements() == 0; + }); + + // edge is connected: perform rpc call to verify edgeId in DeviceActorMessageProcessor updated properly + simulateEdgeActivation(tmpEdge); doPostAsync( "/api/rpc/oneway/" + device.getId().getId().toString(), JacksonUtil.toString(createDefaultRpc()), @@ -857,4 +871,23 @@ public class DeviceEdgeTest extends AbstractEdgeTest { return rpc; } + + private void simulateEdgeActivation(Edge edge) throws Exception { + ObjectNode attributes = JacksonUtil.newObjectNode(); + attributes.put("active", true); + doPost("/api/plugins/telemetry/EDGE/" + edge.getId() + "/attributes/" + DataConstants.SERVER_SCOPE, attributes); + Awaitility.await() + .atMost(TIMEOUT, TimeUnit.SECONDS) + .until(() -> { + List> values = doGetAsyncTyped("/api/plugins/telemetry/EDGE/" + edge.getId() + + "/values/attributes/SERVER_SCOPE", new TypeReference<>() {}); + Optional> activeAttrOpt = values.stream().filter(att -> att.get("key").equals("active")).findFirst(); + if (activeAttrOpt.isEmpty()) { + return false; + } + Map activeAttr = activeAttrOpt.get(); + return "true".equals(activeAttr.get("value").toString()); + }); + } + } diff --git a/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java index 4942dbdb9b..62d2c5eade 100644 --- a/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java @@ -146,7 +146,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest { } } - private void createRuleChainMetadata(RuleChain ruleChain) { + private RuleChainMetaData createRuleChainMetadata(RuleChain ruleChain) { RuleChainMetaData ruleChainMetaData = new RuleChainMetaData(); ruleChainMetaData.setRuleChainId(ruleChain.getId()); @@ -182,7 +182,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest { ruleChainMetaData.addConnectionInfo(0, 2, "fail"); ruleChainMetaData.addConnectionInfo(1, 2, "success"); - doPost("/api/ruleChain/metadata", ruleChainMetaData, RuleChainMetaData.class); + return doPost("/api/ruleChain/metadata", ruleChainMetaData, RuleChainMetaData.class); } @Test @@ -193,9 +193,10 @@ public class RuleChainEdgeTest extends AbstractEdgeTest { ruleChain.setType(RuleChainType.EDGE); RuleChain savedRuleChain = doPost("/api/ruleChain", ruleChain, RuleChain.class); - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); doPost("/api/edge/" + edge.getUuidId() + "/ruleChain/" + savedRuleChain.getUuidId(), RuleChain.class); + RuleChainMetaData metaData = createRuleChainMetadata(savedRuleChain); Assert.assertTrue(edgeImitator.waitForMessages()); // set new rule chain as root @@ -213,6 +214,22 @@ public class RuleChainEdgeTest extends AbstractEdgeTest { Assert.assertTrue(ruleChainMsg.isRoot()); Assert.assertEquals(savedRuleChain.getId(), ruleChainMsg.getId()); + // update metadata for root rule chain + edgeImitator.expectMessageAmount(1); + metaData.getNodes().forEach(n -> n.setDebugMode(true)); + doPost("/api/ruleChain/metadata", metaData, RuleChainMetaData.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + ruleChainUpdateMsgOpt = edgeImitator.findMessageByType(RuleChainUpdateMsg.class); + Assert.assertTrue(ruleChainUpdateMsgOpt.isPresent()); + ruleChainUpdateMsg = ruleChainUpdateMsgOpt.get(); + ruleChainMsg = JacksonUtil.fromString(ruleChainUpdateMsg.getEntity(), RuleChain.class, true); + Assert.assertNotNull(ruleChainMsg); + Assert.assertTrue(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(ruleChainUpdateMsg.getMsgType()) || + UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE.equals(ruleChainUpdateMsg.getMsgType())); + Assert.assertEquals(savedRuleChain.getId(), ruleChainMsg.getId()); + Assert.assertEquals(savedRuleChain.getName(), ruleChainMsg.getName()); + Assert.assertTrue(ruleChainMsg.isRoot()); + // revert root rule chain edgeImitator.expectMessageAmount(1); doPost("/api/edge/" + edge.getUuidId() diff --git a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java index f5e018ed3a..916e5790e9 100644 --- a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java +++ b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java @@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; import org.thingsboard.edge.rpc.EdgeGrpcClient; import org.thingsboard.edge.rpc.EdgeRpcClient; +import org.thingsboard.server.controller.AbstractWebTest; import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; @@ -72,8 +73,6 @@ import java.util.stream.Collectors; @Slf4j public class EdgeImitator { - public static final int TIMEOUT_IN_SECONDS = 30; - private String routingKey; private String routingSecret; @@ -344,7 +343,7 @@ public class EdgeImitator { } public boolean waitForMessages() throws InterruptedException { - return waitForMessages(TIMEOUT_IN_SECONDS); + return waitForMessages(AbstractWebTest.TIMEOUT); } public boolean waitForMessages(int timeoutInSeconds) throws InterruptedException { @@ -359,7 +358,7 @@ public class EdgeImitator { } public boolean waitForResponses() throws InterruptedException { - return responsesLatch.await(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); + return responsesLatch.await(AbstractWebTest.TIMEOUT, TimeUnit.SECONDS); } public void expectResponsesAmount(int messageAmount) { diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructorTest.java index ce6ca753b3..d6bfbf1036 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructorTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.edge.rpc.constructor; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.junit.Assert; @@ -61,7 +60,7 @@ public class RuleChainMsgConstructorTest { } @Test - public void testConstructRuleChainMetadataUpdatedMsg_V_3_4_0() throws JsonProcessingException { + public void testConstructRuleChainMetadataUpdatedMsg_V_3_4_0() { RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID()); RuleChainMetaData ruleChainMetaData = createRuleChainMetaData( ruleChainId, 3, createRuleNodes(ruleChainId), createConnections()); @@ -80,7 +79,7 @@ public class RuleChainMsgConstructorTest { } @Test - public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_3() throws JsonProcessingException { + public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_3() { RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID()); RuleChainMetaData ruleChainMetaData = createRuleChainMetaData( ruleChainId, 3, createRuleNodes(ruleChainId), createConnections()); @@ -120,7 +119,7 @@ public class RuleChainMsgConstructorTest { } @Test - public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0() throws JsonProcessingException { + public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0() { RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID()); RuleChainMetaData ruleChainMetaData = createRuleChainMetaData(ruleChainId, 3, createRuleNodes(ruleChainId), createConnections()); RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = @@ -161,7 +160,7 @@ public class RuleChainMsgConstructorTest { } @Test - public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0_inDifferentOrder() throws JsonProcessingException { + public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0_inDifferentOrder() { // same rule chain metadata, but different order of rule nodes RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID()); RuleChainMetaData ruleChainMetaData1 = createRuleChainMetaData(ruleChainId, 8, createRuleNodesInDifferentOrder(ruleChainId), createConnectionsInDifferentOrder()); @@ -254,7 +253,7 @@ public class RuleChainMsgConstructorTest { return result; } - private List createRuleNodes(RuleChainId ruleChainId) throws JsonProcessingException { + private List createRuleNodes(RuleChainId ruleChainId) { List result = new ArrayList<>(); result.add(getOutputNode(ruleChainId)); result.add(getAcknowledgeNode(ruleChainId)); @@ -301,7 +300,7 @@ public class RuleChainMsgConstructorTest { return result; } - private List createRuleNodesInDifferentOrder(RuleChainId ruleChainId) throws JsonProcessingException { + private List createRuleNodesInDifferentOrder(RuleChainId ruleChainId) { List result = new ArrayList<>(); result.add(getPushToAnalyticsNode(ruleChainId)); result.add(getPushToCloudNode(ruleChainId)); @@ -319,99 +318,99 @@ public class RuleChainMsgConstructorTest { } - private RuleNode getOutputNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getOutputNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.flow.TbRuleChainOutputNode", "Output node", - JacksonUtil.OBJECT_MAPPER.readTree("{\"version\":0}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"\",\"layoutX\":178,\"layoutY\":592}")); + JacksonUtil.toJsonNode("{\"version\":0}"), + JacksonUtil.toJsonNode("{\"description\":\"\",\"layoutX\":178,\"layoutY\":592}")); } - private RuleNode getCheckpointNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getCheckpointNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.flow.TbCheckpointNode", "Checkpoint node", - JacksonUtil.OBJECT_MAPPER.readTree("{\"queueName\":\"HighPriority\"}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"\",\"layoutX\":178,\"layoutY\":647}")); + JacksonUtil.toJsonNode("{\"queueName\":\"HighPriority\"}"), + JacksonUtil.toJsonNode("{\"description\":\"\",\"layoutX\":178,\"layoutY\":647}")); } - private RuleNode getSaveTimeSeriesNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getSaveTimeSeriesNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "Save Timeseries", - JacksonUtil.OBJECT_MAPPER.readTree("{\"defaultTTL\":0}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":823,\"layoutY\":157}")); + JacksonUtil.toJsonNode("{\"defaultTTL\":0}"), + JacksonUtil.toJsonNode("{\"layoutX\":823,\"layoutY\":157}")); } - private RuleNode getMessageTypeSwitchNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getMessageTypeSwitchNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", "Message Type Switch", - JacksonUtil.OBJECT_MAPPER.readTree("{\"version\":0}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":347,\"layoutY\":149}")); + JacksonUtil.toJsonNode("{\"version\":0}"), + JacksonUtil.toJsonNode("{\"layoutX\":347,\"layoutY\":149}")); } - private RuleNode getLogOtherNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getLogOtherNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.action.TbLogNode", "Log Other", - JacksonUtil.OBJECT_MAPPER.readTree("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":824,\"layoutY\":378}")); + JacksonUtil.toJsonNode("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"), + JacksonUtil.toJsonNode("{\"layoutX\":824,\"layoutY\":378}")); } - private RuleNode getPushToCloudNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getPushToCloudNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode", "Push to cloud", - JacksonUtil.OBJECT_MAPPER.readTree("{\"scope\":\"SERVER_SCOPE\"}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":1129,\"layoutY\":52}")); + JacksonUtil.toJsonNode("{\"scope\":\"SERVER_SCOPE\"}"), + JacksonUtil.toJsonNode("{\"layoutX\":1129,\"layoutY\":52}")); } - private RuleNode getAcknowledgeNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getAcknowledgeNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.flow.TbAckNode", "Acknowledge node", - JacksonUtil.OBJECT_MAPPER.readTree("{\"version\":0}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"\",\"layoutX\":177,\"layoutY\":703}")); + JacksonUtil.toJsonNode("{\"version\":0}"), + JacksonUtil.toJsonNode("{\"description\":\"\",\"layoutX\":177,\"layoutY\":703}")); } - private RuleNode getDeviceProfileNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getDeviceProfileNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.profile.TbDeviceProfileNode", "Device Profile Node", - JacksonUtil.OBJECT_MAPPER.readTree("{\"persistAlarmRulesState\":false,\"fetchAlarmRulesStateOnStart\":false}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \\\"Success\\\" relation type.\",\"layoutX\":187,\"layoutY\":468}")); + JacksonUtil.toJsonNode("{\"persistAlarmRulesState\":false,\"fetchAlarmRulesStateOnStart\":false}"), + JacksonUtil.toJsonNode("{\"description\":\"Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \\\"Success\\\" relation type.\",\"layoutX\":187,\"layoutY\":468}")); } - private RuleNode getSaveClientAttributesNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getSaveClientAttributesNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode", "Save Client Attributes", - JacksonUtil.OBJECT_MAPPER.readTree("{\"scope\":\"CLIENT_SCOPE\"}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":824,\"layoutY\":52}")); + JacksonUtil.toJsonNode("{\"scope\":\"CLIENT_SCOPE\"}"), + JacksonUtil.toJsonNode("{\"layoutX\":824,\"layoutY\":52}")); } - private RuleNode getLogRpcFromDeviceNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getLogRpcFromDeviceNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.action.TbLogNode", "Log RPC from Device", - JacksonUtil.OBJECT_MAPPER.readTree("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":825,\"layoutY\":266}")); + JacksonUtil.toJsonNode("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"), + JacksonUtil.toJsonNode("{\"layoutX\":825,\"layoutY\":266}")); } - private RuleNode getRpcCallRequestNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getRpcCallRequestNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode", "RPC Call Request", - JacksonUtil.OBJECT_MAPPER.readTree("{\"timeoutInSeconds\":60}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":824,\"layoutY\":466}")); + JacksonUtil.toJsonNode("{\"timeoutInSeconds\":60}"), + JacksonUtil.toJsonNode("{\"layoutX\":824,\"layoutY\":466}")); } - private RuleNode getPushToAnalyticsNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getPushToAnalyticsNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.flow.TbRuleChainInputNode", "Push to Analytics", - JacksonUtil.OBJECT_MAPPER.readTree("{\"ruleChainId\":\"af588000-6c7c-11ec-bafd-c9a47a5c8d99\"}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"\",\"layoutX\":477,\"layoutY\":560}")); + JacksonUtil.toJsonNode("{\"ruleChainId\":\"af588000-6c7c-11ec-bafd-c9a47a5c8d99\"}"), + JacksonUtil.toJsonNode("{\"description\":\"\",\"layoutX\":477,\"layoutY\":560}")); } -} \ No newline at end of file +} diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java index 9c45977a7c..835b39caf9 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java @@ -55,6 +55,7 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; @@ -209,6 +210,9 @@ public abstract class BaseEdgeProcessorTest { @MockBean protected AttributesService attributesService; + @MockBean + protected TimeseriesService timeseriesService; + @MockBean protected TbClusterService tbClusterService; diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java index 15e55d8713..d6d5551ccf 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java @@ -91,4 +91,6 @@ public interface EdgeService extends EntityDaoService { PageData findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink); String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId, String tbRuleChainInputNodeClassName); + + ListenableFuture isEdgeActiveAsync(TenantId tenantId, EdgeId edgeId, String activityState); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 80eea0a478..136fd0485d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -30,10 +30,10 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.event.TransactionalEventListener; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.Edge; @@ -48,14 +48,15 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entity.AbstractCachedEntityService; import org.thingsboard.server.dao.eventsourcing.ActionEntityEvent; import org.thingsboard.server.dao.exception.DataValidationException; @@ -64,7 +65,7 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.PaginatedRemover; import org.thingsboard.server.dao.service.Validator; -import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; import javax.annotation.Nullable; @@ -105,7 +106,10 @@ public class EdgeServiceImpl extends AbstractCachedEntityService edgeValidator; @@ -113,6 +117,8 @@ public class EdgeServiceImpl extends AbstractCachedEntityService isEdgeActiveAsync(TenantId tenantId, EdgeId edgeId, String key) { + ListenableFuture> futureKvEntry; + if (persistToTelemetry) { + futureKvEntry = timeseriesService.findLatest(tenantId, edgeId, key); + } else { + futureKvEntry = attributesService.find(tenantId, edgeId, DataConstants.SERVER_SCOPE, key); + } + return Futures.transformAsync(futureKvEntry, kvEntryOpt -> + Futures.immediateFuture(kvEntryOpt.flatMap(KvEntry::getBooleanValue).orElse(false)), MoreExecutors.directExecutor()); + } + private List findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) { List result = new ArrayList<>(); PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); diff --git a/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotifications.java b/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotifications.java index 1ef0006e87..fd17618bd1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotifications.java +++ b/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotifications.java @@ -40,9 +40,9 @@ import org.thingsboard.server.common.data.notification.rule.trigger.config.Alarm import org.thingsboard.server.common.data.notification.rule.trigger.config.ApiUsageLimitNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.DeviceActivityNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.DeviceActivityNotificationRuleTriggerConfig.DeviceEvent; +import org.thingsboard.server.common.data.notification.rule.trigger.config.EdgeCommunicationFailureNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.EdgeConnectionNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.EdgeConnectionNotificationRuleTriggerConfig.EdgeConnectivityEvent; -import org.thingsboard.server.common.data.notification.rule.trigger.config.EdgeCommunicationFailureNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.EntitiesLimitNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.EntityActionNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.NewPlatformVersionNotificationRuleTriggerConfig; @@ -347,7 +347,7 @@ public class DefaultNotifications { public static final DefaultNotification edgeCommunicationFailures = DefaultNotification.builder() .name("Edge communication failure notification") .type(NotificationType.EDGE_COMMUNICATION_FAILURE) - .subject("Edge '${edgeName}' communication failure occured") + .subject("Edge '${edgeName}' communication failure occurred") .text("Failure message: '${failureMsg}'") .icon("error").color(RED_COLOR) .button("Go to Edge").link("/edgeManagement/instances/${edgeId}") diff --git a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java index f3a7ee77c7..8783770be3 100644 --- a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java +++ b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java @@ -525,12 +525,11 @@ public class RestClient implements Closeable { } public PageData getAlarmComments(AlarmId alarmId, PageLink pageLink) { - String urlSecondPart = "/api/alarm/{alarmId}/comment"; Map params = new HashMap<>(); params.put("alarmId", alarmId.getId().toString()); - + addPageLinkToParam(params, pageLink); return restTemplate.exchange( - baseURL + urlSecondPart + "&" + getUrlParams(pageLink), + baseURL + "/api/alarm/{alarmId}/comment?" + getUrlParams(pageLink), HttpMethod.GET, HttpEntity.EMPTY, new ParameterizedTypeReference>() { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index 31fda250d0..4176ead31c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -47,6 +47,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.INACTIVITY_EVENT; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST; import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_UPDATED; +import static org.thingsboard.server.common.data.msg.TbMsgType.TO_SERVER_RPC_REQUEST; @Slf4j public abstract class AbstractTbMsgPushNode implements TbNode { @@ -176,6 +177,6 @@ public abstract class AbstractTbMsgPushNode
" + + "If an object with coordinates extracted from incoming message enters the geofence, sends a message with the type Entered. " + + "If an object leaves the geofence, sends a message with the type Left. " + + "If the presence monitoring strategy \"On first message\" is selected, sends messages via rule node connection type Inside or Outside only the first time the geofencing and duration conditions are satisfied; otherwise sends messages via rule node connection type Success. " + + "If the presence monitoring strategy \"On each message\" is selected, sends messages via rule node connection type Inside or Outside every time the geofencing condition is satisfied. " + + "

" + + "Output connections: Entered, Left, Inside, Outside, Success", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeGpsGeofencingConfig" ) public class TbGpsGeofencingActionNode extends AbstractGeofencingNode { + private static final String REPORT_PRESENCE_STATUS_ON_EACH_MESSAGE = "reportPresenceStatusOnEachMessage"; private final Map entityStates = new HashMap<>(); private final Gson gson = new Gson(); private final JsonParser parser = new JsonParser(); @@ -80,25 +97,32 @@ public class TbGpsGeofencingActionNode extends AbstractGeofencingNode (entityState.isInside() ? - TimeUnit.valueOf(config.getMinInsideDurationTimeUnit()).toMillis(config.getMinInsideDuration()) : TimeUnit.valueOf(config.getMinOutsideDurationTimeUnit()).toMillis(config.getMinOutsideDuration()))) { - setStaid(ctx, msg.getOriginator(), entityState); - ctx.tellNext(msg, entityState.isInside() ? "Inside" : "Outside"); - told = true; - } - } + ctx.tellNext(msg, matches ? ENTERED : LEFT); + return; } - if (!told) { + + if (config.isReportPresenceStatusOnEachMessage()) { + ctx.tellNext(msg, entityState.isInside() ? INSIDE : OUTSIDE); + return; + } + + if (entityState.isStayed()) { ctx.tellSuccess(msg); + return; } + + long stayTime = ts - entityState.getStateSwitchTime(); + if (stayTime > (entityState.isInside() ? + TimeUnit.valueOf(config.getMinInsideDurationTimeUnit()).toMillis(config.getMinInsideDuration()) : + TimeUnit.valueOf(config.getMinOutsideDurationTimeUnit()).toMillis(config.getMinOutsideDuration()))) { + setStaid(ctx, msg.getOriginator(), entityState); + ctx.tellNext(msg, entityState.isInside() ? INSIDE : OUTSIDE); + return; + } + + ctx.tellSuccess(msg); } private void switchState(TbContext ctx, EntityId entityId, EntityGeofencingState entityState, boolean matches, long ts) { @@ -127,4 +151,17 @@ public class TbGpsGeofencingActionNode extends AbstractGeofencingNode getConfigClazz() { return TbGpsGeofencingActionNodeConfiguration.class; } + + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + if (fromVersion == 0) { + if (!oldConfiguration.has(REPORT_PRESENCE_STATUS_ON_EACH_MESSAGE)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).put(REPORT_PRESENCE_STATUS_ON_EACH_MESSAGE, false); + } + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeConfiguration.java index d0adad8996..04bbab01b7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeConfiguration.java @@ -31,6 +31,8 @@ public class TbGpsGeofencingActionNodeConfiguration extends TbGpsGeofencingFilte private String minInsideDurationTimeUnit; private String minOutsideDurationTimeUnit; + private boolean reportPresenceStatusOnEachMessage; + @Override public TbGpsGeofencingActionNodeConfiguration defaultConfiguration() { TbGpsGeofencingActionNodeConfiguration configuration = new TbGpsGeofencingActionNodeConfiguration(); @@ -43,6 +45,7 @@ public class TbGpsGeofencingActionNodeConfiguration extends TbGpsGeofencingFilte configuration.setMinOutsideDurationTimeUnit(TimeUnit.MINUTES.name()); configuration.setMinInsideDuration(1); configuration.setMinOutsideDuration(1); + configuration.setReportPresenceStatusOnEachMessage(true); return configuration; } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoader.java index 80d73b9478..4d651f48fa 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoader.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.RuleChainId; @@ -63,6 +64,9 @@ public class EntitiesFieldsAsyncLoader { case ENTITY_VIEW: return toEntityFieldsDataAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), (EntityViewId) originatorId), EntityFieldsData::new, ctx); + case EDGE: + return toEntityFieldsDataAsync(ctx.getEdgeService().findEdgeByIdAsync(ctx.getTenantId(), (EdgeId) originatorId), + EntityFieldsData::new, ctx); default: return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType: " + originatorId.getEntityType())); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/GpsGeofencingEvents.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/GpsGeofencingEvents.java new file mode 100644 index 0000000000..db5ddaf698 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/GpsGeofencingEvents.java @@ -0,0 +1,23 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.util; + +public class GpsGeofencingEvents { + public static final String ENTERED = "Entered"; + public static final String INSIDE = "Inside"; + public static final String LEFT = "Left"; + public static final String OUTSIDE = "Outside"; +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/GpsGeofencingActionTestCase.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/GpsGeofencingActionTestCase.java new file mode 100644 index 0000000000..804d30d28f --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/GpsGeofencingActionTestCase.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.geo; + +import lombok.Data; +import org.thingsboard.server.common.data.id.EntityId; + +import java.util.HashMap; +import java.util.Map; + +@Data +public class GpsGeofencingActionTestCase { + + private EntityId entityId; + private Map entityStates; + private boolean msgInside; + private boolean reportPresenceStatusOnEachMessage; + + public GpsGeofencingActionTestCase(EntityId entityId, boolean msgInside, boolean reportPresenceStatusOnEachMessage, EntityGeofencingState entityGeofencingState) { + this.entityId = entityId; + this.msgInside = msgInside; + this.reportPresenceStatusOnEachMessage = reportPresenceStatusOnEachMessage; + this.entityStates = new HashMap<>(); + this.entityStates.put(entityId, entityGeofencingState); + } +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java new file mode 100644 index 0000000000..5cd7e02c59 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java @@ -0,0 +1,259 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.geo; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.attributes.AttributesService; + +import java.time.Duration; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.ENTERED; +import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.INSIDE; +import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.LEFT; +import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.OUTSIDE; +import static org.thingsboard.server.common.data.msg.TbNodeConnectionType.SUCCESS; + +@ExtendWith(MockitoExtension.class) +class TbGpsGeofencingActionNodeTest extends AbstractRuleNodeUpgradeTest { + + @Mock + private TbContext ctx; + @Mock + private AttributesService attributesService; + private TbGpsGeofencingActionNode node; + + @BeforeEach + void setUp() { + node = spy(new TbGpsGeofencingActionNode()); + } + + @AfterEach + void tearDown() { + node.destroy(); + } + + private static Stream givenReportPresenceStatusOnEachMessage_whenOnMsg_thenVerifyOutputMsgType() { + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + long tsNow = System.currentTimeMillis(); + long tsNowMinusMinuteAndMillis = tsNow - Duration.ofMinutes(1).plusMillis(1).toMillis(); + return Stream.of( + // default config with presenceMonitoringStrategyOnEachMessage false and msgInside true + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, + new EntityGeofencingState(false, 0, false)), ENTERED), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, + new EntityGeofencingState(true, tsNow, false)), SUCCESS), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, + new EntityGeofencingState(true, tsNowMinusMinuteAndMillis, false)), INSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, + new EntityGeofencingState(true, tsNow, true)), SUCCESS), + // default config with presenceMonitoringStrategyOnEachMessage false and msgInside false + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, + new EntityGeofencingState(false, 0, false)), LEFT), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, + new EntityGeofencingState(false, tsNow, false)), SUCCESS), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, + new EntityGeofencingState(false, tsNowMinusMinuteAndMillis, false)), OUTSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, + new EntityGeofencingState(false, tsNow, true)), SUCCESS), + // default config with presenceMonitoringStrategyOnEachMessage true and msgInside true + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true, + new EntityGeofencingState(false, 0, false)), ENTERED), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true, + new EntityGeofencingState(true, tsNow, false)), INSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true, + new EntityGeofencingState(true, tsNowMinusMinuteAndMillis, false)), INSIDE), + // default config with presenceMonitoringStrategyOnEachMessage true and msgInside false + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true, + new EntityGeofencingState(false, 0, false)), LEFT), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true, + new EntityGeofencingState(false, tsNow, false)), OUTSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true, + new EntityGeofencingState(false, tsNowMinusMinuteAndMillis, false)), OUTSIDE) + ); + } + + @ParameterizedTest + @MethodSource + void givenReportPresenceStatusOnEachMessage_whenOnMsg_thenVerifyOutputMsgType( + GpsGeofencingActionTestCase gpsGeofencingActionTestCase, + String expectedOutput + ) throws TbNodeException { + // GIVEN + var config = new TbGpsGeofencingActionNodeConfiguration().defaultConfiguration(); + config.setReportPresenceStatusOnEachMessage(gpsGeofencingActionTestCase.isReportPresenceStatusOnEachMessage()); + + node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + TbMsg msg = gpsGeofencingActionTestCase.isMsgInside() ? + getInsideRectangleTbMsg(gpsGeofencingActionTestCase.getEntityId()) : + getOutsideRectangleTbMsg(gpsGeofencingActionTestCase.getEntityId()); + + when(ctx.getAttributesService()).thenReturn(attributesService); + + ReflectionTestUtils.setField(node, "entityStates", gpsGeofencingActionTestCase.getEntityStates()); + + // WHEN + node.onMsg(ctx, msg); + + // THEN + verify(ctx.getAttributesService(), never()).find(any(), any(), any(), anyString()); + verify(ctx, never()).tellFailure(any(), any(Throwable.class)); + verify(ctx, never()).enqueueForTellNext(any(), eq(expectedOutput), any(), any()); + verify(ctx, never()).ack(any()); + + if (SUCCESS.equals(expectedOutput)) { + verify(ctx).tellSuccess(eq(msg)); + } else { + verify(ctx).tellNext(eq(msg), eq(expectedOutput)); + } + } + + private TbMsg getOutsideRectangleTbMsg(EntityId entityId) { + return getTbMsg(entityId, getMetadataForNewVersionPolygonPerimeter(), + GeoUtilTest.POINT_OUTSIDE_SIMPLE_RECT.getLatitude(), + GeoUtilTest.POINT_OUTSIDE_SIMPLE_RECT.getLongitude()); + } + + private TbMsg getInsideRectangleTbMsg(EntityId entityId) { + return getTbMsg(entityId, getMetadataForNewVersionPolygonPerimeter(), + GeoUtilTest.POINT_INSIDE_SIMPLE_RECT_CENTER.getLatitude(), + GeoUtilTest.POINT_INSIDE_SIMPLE_RECT_CENTER.getLongitude()); + } + + private TbMsg getTbMsg(EntityId entityId, TbMsgMetaData metadata, double latitude, double longitude) { + String data = "{\"latitude\": " + latitude + ", \"longitude\": " + longitude + "}"; + return TbMsg.newMsg(TbMsgType.POST_ATTRIBUTES_REQUEST, entityId, metadata, data); + } + + private TbMsgMetaData getMetadataForNewVersionPolygonPerimeter() { + var metadata = new TbMsgMetaData(); + metadata.putValue("ss_perimeter", GeoUtilTest.SIMPLE_RECT); + return metadata; + } + + // Rule nodes upgrade + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // default config for version 0 + Arguments.of(0, + "{\n" + + " \"minInsideDuration\": 1,\n" + + " \"minOutsideDuration\": 1,\n" + + " \"minInsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"latitudeKeyName\": \"latitude\",\n" + + " \"longitudeKeyName\": \"longitude\",\n" + + " \"perimeterType\": \"POLYGON\",\n" + + " \"fetchPerimeterInfoFromMessageMetadata\": true,\n" + + " \"perimeterKeyName\": \"ss_perimeter\",\n" + + " \"polygonsDefinition\": null,\n" + + " \"centerLatitude\": null,\n" + + " \"centerLongitude\": null,\n" + + " \"range\": null,\n" + + " \"rangeUnit\": null\n" + + "}\n", + true, + "{\n" + + " \"minInsideDuration\": 1,\n" + + " \"minOutsideDuration\": 1,\n" + + " \"minInsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"reportPresenceStatusOnEachMessage\": false,\n" + + " \"latitudeKeyName\": \"latitude\",\n" + + " \"longitudeKeyName\": \"longitude\",\n" + + " \"perimeterType\": \"POLYGON\",\n" + + " \"fetchPerimeterInfoFromMessageMetadata\": true,\n" + + " \"perimeterKeyName\": \"ss_perimeter\",\n" + + " \"polygonsDefinition\": null,\n" + + " \"centerLatitude\": null,\n" + + " \"centerLongitude\": null,\n" + + " \"range\": null,\n" + + " \"rangeUnit\": null\n" + + "}\n"), + // default config for version 1 with upgrade from version 0 + Arguments.of(0, + "{\n" + + " \"minInsideDuration\": 1,\n" + + " \"minOutsideDuration\": 1,\n" + + " \"minInsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"reportPresenceStatusOnEachMessage\": false,\n" + + " \"latitudeKeyName\": \"latitude\",\n" + + " \"longitudeKeyName\": \"longitude\",\n" + + " \"perimeterType\": \"POLYGON\",\n" + + " \"fetchPerimeterInfoFromMessageMetadata\": true,\n" + + " \"perimeterKeyName\": \"ss_perimeter\",\n" + + " \"polygonsDefinition\": null,\n" + + " \"centerLatitude\": null,\n" + + " \"centerLongitude\": null,\n" + + " \"range\": null,\n" + + " \"rangeUnit\": null\n" + + "}\n", + false, + "{\n" + + " \"minInsideDuration\": 1,\n" + + " \"minOutsideDuration\": 1,\n" + + " \"minInsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"reportPresenceStatusOnEachMessage\": false,\n" + + " \"latitudeKeyName\": \"latitude\",\n" + + " \"longitudeKeyName\": \"longitude\",\n" + + " \"perimeterType\": \"POLYGON\",\n" + + " \"fetchPerimeterInfoFromMessageMetadata\": true,\n" + + " \"perimeterKeyName\": \"ss_perimeter\",\n" + + " \"polygonsDefinition\": null,\n" + + " \"centerLatitude\": null,\n" + + " \"centerLongitude\": null,\n" + + " \"range\": null,\n" + + " \"rangeUnit\": null\n" + + "}\n") + ); + } + + @Override + protected TbNode getTestNode() { + return node; + } + +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoaderTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoaderTest.java index 5967023354..f3e61dcffc 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoaderTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoaderTest.java @@ -37,10 +37,12 @@ import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; @@ -52,6 +54,7 @@ import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.customer.CustomerService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TenantService; @@ -95,6 +98,8 @@ public class EntitiesFieldsAsyncLoaderTest { private RuleChainService ruleChainServiceMock; @Mock private EntityViewService entityViewServiceMock; + @Mock + private EdgeService edgeServiceMock; @BeforeAll public static void setup() { @@ -108,7 +113,8 @@ public class EntitiesFieldsAsyncLoaderTest { EntityType.DEVICE, EntityType.ALARM, EntityType.RULE_CHAIN, - EntityType.ENTITY_VIEW + EntityType.ENTITY_VIEW, + EntityType.EDGE ); } @@ -228,6 +234,14 @@ public class EntitiesFieldsAsyncLoaderTest { when(ctxMock.getEntityViewService()).thenReturn(entityViewServiceMock); doReturn(entityView).when(entityViewServiceMock).findEntityViewByIdAsync(eq(TENANT_ID), any()); + break; + case EDGE: + var edge = Futures.immediateFuture(entityDoesNotExist ? null : new Edge(new EdgeId(RANDOM_UUID))); + + when(ctxMock.getDbCallbackExecutor()).thenReturn(DB_EXECUTOR); + when(ctxMock.getEdgeService()).thenReturn(edgeServiceMock); + doReturn(edge).when(edgeServiceMock).findEdgeByIdAsync(eq(TENANT_ID), any()); + break; default: throw new RuntimeException("Unexpected EntityType: " + entityType); @@ -252,6 +266,8 @@ public class EntitiesFieldsAsyncLoaderTest { return new RuleChain((RuleChainId) entityId); case ENTITY_VIEW: return new EntityView((EntityViewId) entityId); + case EDGE: + return new Edge((EdgeId) entityId); default: throw new RuntimeException("Unexpected EntityType: " + entityId.getEntityType()); }