From 949d3065609ad712df8df7ea5af03abea746679b Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 12 Jan 2022 15:42:43 +0200 Subject: [PATCH] Added functionality to support 3.3.0 edge version rule chains --- .../service/edge/rpc/EdgeGrpcSession.java | 9 +- .../constructor/RuleChainMsgConstructor.java | 193 +++++++++- .../rpc/processor/RuleChainEdgeProcessor.java | 5 +- .../RuleChainMsgConstructorTest.java | 358 ++++++++++++++++++ common/edge-api/src/main/proto/edge.proto | 6 + 5 files changed, 546 insertions(+), 25 deletions(-) create mode 100644 application/src/test/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructorTest.java diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 46b36ca99f..2fc8cdf841 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -52,6 +52,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.DownlinkResponseMsg; import org.thingsboard.server.gen.edge.v1.EdgeConfiguration; import org.thingsboard.server.gen.edge.v1.EdgeUpdateMsg; +import org.thingsboard.server.gen.edge.v1.EdgeVersion; import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.edge.v1.EntityViewsRequestMsg; import org.thingsboard.server.gen.edge.v1.RelationRequestMsg; @@ -73,14 +74,11 @@ import org.thingsboard.server.service.edge.rpc.fetch.GeneralEdgeEventFetcher; import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; @@ -109,6 +107,8 @@ public final class EdgeGrpcSession implements Closeable { private boolean connected; private boolean syncCompleted; + private EdgeVersion edgeVersion; + private ScheduledExecutorService sendDownlinkExecutorService; EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver outputStream, BiConsumer sessionOpenListener, @@ -525,7 +525,7 @@ public final class EdgeGrpcSession implements Closeable { case RULE_CHAIN: return ctx.getRuleChainProcessor().processRuleChainToEdge(edge, edgeEvent, msgType, action); case RULE_CHAIN_METADATA: - return ctx.getRuleChainProcessor().processRuleChainMetadataToEdge(edgeEvent, msgType); + return ctx.getRuleChainProcessor().processRuleChainMetadataToEdge(edgeEvent, msgType, this.edgeVersion); case ALARM: return ctx.getAlarmProcessor().processAlarmToEdge(edge, edgeEvent, msgType, action); case USER: @@ -655,6 +655,7 @@ public final class EdgeGrpcSession implements Closeable { try { if (edge.getSecret().equals(request.getEdgeSecret())) { sessionOpenListener.accept(edge.getId(), this); + this.edgeVersion = request.getEdgeVersion(); return ConnectResponseMsg.newBuilder() .setResponseCode(ConnectResponseCode.ACCEPTED) .setErrorMsg("") diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructor.java index 11b14a78dc..e564b921f2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructor.java @@ -17,15 +17,18 @@ package org.thingsboard.server.service.edge.rpc.constructor; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.flow.TbRuleChainInputNodeConfiguration; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.rule.NodeConnectionInfo; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.gen.edge.v1.EdgeVersion; import org.thingsboard.server.gen.edge.v1.NodeConnectionInfoProto; import org.thingsboard.server.gen.edge.v1.RuleChainConnectionInfoProto; import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg; @@ -36,6 +39,10 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import java.util.ArrayList; import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.stream.Collectors; @Component @Slf4j @@ -43,6 +50,8 @@ import java.util.List; public class RuleChainMsgConstructor { private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String RULE_CHAIN_INPUT_NODE = "org.thingsboard.rule.engine.flow.TbRuleChainInputNode"; + private static final String TB_RULE_CHAIN_OUTPUT_NODE = "org.thingsboard.rule.engine.flow.TbRuleChainOutputNode"; public RuleChainUpdateMsg constructRuleChainUpdatedMsg(RuleChainId edgeRootRuleChainId, UpdateMsgType msgType, RuleChain ruleChain) { RuleChainUpdateMsg.Builder builder = RuleChainUpdateMsg.newBuilder() @@ -60,18 +69,19 @@ public class RuleChainMsgConstructor { return builder.build(); } - public RuleChainMetadataUpdateMsg constructRuleChainMetadataUpdatedMsg(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) { + public RuleChainMetadataUpdateMsg constructRuleChainMetadataUpdatedMsg(UpdateMsgType msgType, + RuleChainMetaData ruleChainMetaData, + EdgeVersion edgeVersion) { try { - RuleChainMetadataUpdateMsg.Builder builder = RuleChainMetadataUpdateMsg.newBuilder() - .setRuleChainIdMSB(ruleChainMetaData.getRuleChainId().getId().getMostSignificantBits()) - .setRuleChainIdLSB(ruleChainMetaData.getRuleChainId().getId().getLeastSignificantBits()) - .addAllNodes(constructNodes(ruleChainMetaData.getNodes())) - .addAllConnections(constructConnections(ruleChainMetaData.getConnections())) - .addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections())); - if (ruleChainMetaData.getFirstNodeIndex() != null) { - builder.setFirstNodeIndex(ruleChainMetaData.getFirstNodeIndex()); - } else { - builder.setFirstNodeIndex(-1); + RuleChainMetadataUpdateMsg.Builder builder = RuleChainMetadataUpdateMsg.newBuilder(); + switch (edgeVersion) { + case V_3_3_0: + constructRuleChainMetadataUpdatedMsg_V_3_3_0(builder, ruleChainMetaData); + break; + case V_3_3_3: + default: + constructRuleChainMetadataUpdatedMsg_V_3_3_3(builder, ruleChainMetaData); + break; } builder.setMsgType(msgType); return builder.build(); @@ -81,6 +91,104 @@ public class RuleChainMsgConstructor { return null; } + private void constructRuleChainMetadataUpdatedMsg_V_3_3_3(RuleChainMetadataUpdateMsg.Builder builder, + RuleChainMetaData ruleChainMetaData) throws JsonProcessingException { + builder.setRuleChainIdMSB(ruleChainMetaData.getRuleChainId().getId().getMostSignificantBits()) + .setRuleChainIdLSB(ruleChainMetaData.getRuleChainId().getId().getLeastSignificantBits()) + .addAllNodes(constructNodes(ruleChainMetaData.getNodes())) + .addAllConnections(constructConnections(ruleChainMetaData.getConnections())) + .addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections(), new TreeSet<>())); + if (ruleChainMetaData.getFirstNodeIndex() != null) { + builder.setFirstNodeIndex(ruleChainMetaData.getFirstNodeIndex()); + } else { + builder.setFirstNodeIndex(-1); + } + } + + private void constructRuleChainMetadataUpdatedMsg_V_3_3_0(RuleChainMetadataUpdateMsg.Builder builder, + RuleChainMetaData ruleChainMetaData) throws JsonProcessingException { + List supportedNodes = filterNodes_V_3_3_0(ruleChainMetaData.getNodes()); + NavigableSet removedNodeIndexes = getRemovedNodeIndexes(ruleChainMetaData.getNodes(), ruleChainMetaData.getConnections()); + List connections = filterConnections_V_3_3_0(ruleChainMetaData.getNodes(), ruleChainMetaData.getConnections(), removedNodeIndexes); + + List ruleChainConnections = new ArrayList<>(); + if (ruleChainMetaData.getRuleChainConnections() != null) { + ruleChainConnections.addAll(ruleChainMetaData.getRuleChainConnections()); + } + ruleChainConnections.addAll(addRuleChainConnections_V_3_3_0(ruleChainMetaData.getNodes(), ruleChainMetaData.getConnections())); + builder.setRuleChainIdMSB(ruleChainMetaData.getRuleChainId().getId().getMostSignificantBits()) + .setRuleChainIdLSB(ruleChainMetaData.getRuleChainId().getId().getLeastSignificantBits()) + .addAllNodes(constructNodes(supportedNodes)) + .addAllConnections(constructConnections(connections)) + .addAllRuleChainConnections(constructRuleChainConnections(ruleChainConnections, removedNodeIndexes)); + if (ruleChainMetaData.getFirstNodeIndex() != null) { + Integer firstNodeIndex = ruleChainMetaData.getFirstNodeIndex(); + // decrease index because of removed nodes + for (Integer removedIndex : removedNodeIndexes) { + if (firstNodeIndex > removedIndex) { + firstNodeIndex = firstNodeIndex - 1; + } + } + builder.setFirstNodeIndex(firstNodeIndex); + } else { + builder.setFirstNodeIndex(-1); + } + } + + private List filterConnections_V_3_3_0(List nodes, List connections, NavigableSet removedNodeIndexes) { + List result = new ArrayList<>(); + if (connections != null) { + result = connections.stream().filter(conn -> { + for (int i = 0; i < nodes.size(); i++) { + RuleNode node = nodes.get(i); + if (node.getType().equalsIgnoreCase(RULE_CHAIN_INPUT_NODE) + || node.getType().equalsIgnoreCase(TB_RULE_CHAIN_OUTPUT_NODE)) { + if (conn.getFromIndex() == i || conn.getToIndex() == i) { + return false; + } + } + } + return true; + }).map(conn -> { + NodeConnectionInfo newConn = new NodeConnectionInfo(); + newConn.setFromIndex(conn.getFromIndex()); + newConn.setToIndex(conn.getToIndex()); + newConn.setType(conn.getType()); + return newConn; + }).collect(Collectors.toList()); + } + + // decrease index because of removed nodes + for (Integer removedIndex : removedNodeIndexes) { + for (NodeConnectionInfo newConn : result) { + if (newConn.getToIndex() > removedIndex) { + newConn.setToIndex(newConn.getToIndex() - 1); + } + if (newConn.getFromIndex() > removedIndex) { + newConn.setFromIndex(newConn.getFromIndex() - 1); + } + } + } + + return result; + } + + private NavigableSet getRemovedNodeIndexes(List nodes, List connections) { + TreeSet removedIndexes = new TreeSet<>(); + for (NodeConnectionInfo connection : connections) { + for (int i = 0; i < nodes.size(); i++) { + RuleNode node = nodes.get(i); + if (node.getType().equalsIgnoreCase(RULE_CHAIN_INPUT_NODE) + || node.getType().equalsIgnoreCase(TB_RULE_CHAIN_OUTPUT_NODE)) { + if (connection.getFromIndex() == i || connection.getToIndex() == i) { + removedIndexes.add(i); + } + } + } + } + return removedIndexes.descendingSet(); + } + private List constructConnections(List connections) { List result = new ArrayList<>(); if (connections != null && !connections.isEmpty()) { @@ -99,6 +207,21 @@ public class RuleChainMsgConstructor { .build(); } + private List filterNodes_V_3_3_0(List nodes) { + List result = new ArrayList<>(); + for (RuleNode node : nodes) { + switch (node.getType()) { + case RULE_CHAIN_INPUT_NODE: + case TB_RULE_CHAIN_OUTPUT_NODE: + log.trace("Skipping not supported rule node {}", node); + break; + default: + result.add(node); + } + } + return result; + } + private List constructNodes(List nodes) throws JsonProcessingException { List result = new ArrayList<>(); if (nodes != null && !nodes.isEmpty()) { @@ -109,23 +232,55 @@ public class RuleChainMsgConstructor { return result; } - private List constructRuleChainConnections(List ruleChainConnections) throws JsonProcessingException { - List result = new ArrayList<>(); - if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) { - for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) { - result.add(constructRuleChainConnection(ruleChainConnectionInfo)); + private List addRuleChainConnections_V_3_3_0(List nodes, List connections) throws JsonProcessingException { + List result = new ArrayList<>(); + for (int i = 0; i < nodes.size(); i++) { + RuleNode node = nodes.get(i); + if (node.getType().equalsIgnoreCase(RULE_CHAIN_INPUT_NODE)) { + for (NodeConnectionInfo connection : connections) { + if (connection.getToIndex() == i) { + RuleChainConnectionInfo e = new RuleChainConnectionInfo(); + e.setFromIndex(connection.getFromIndex()); + TbRuleChainInputNodeConfiguration configuration = JacksonUtil.treeToValue(node.getConfiguration(), TbRuleChainInputNodeConfiguration.class); + e.setTargetRuleChainId(new RuleChainId(UUID.fromString(configuration.getRuleChainId()))); + e.setAdditionalInfo(node.getAdditionalInfo()); + e.setType(connection.getType()); + result.add(e); + } + } } } return result; } - private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) throws JsonProcessingException { + private List constructRuleChainConnections(List ruleChainConnections, NavigableSet removedNodeIndexes) throws JsonProcessingException { + List result = new ArrayList<>(); + if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) { + for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) { + result.add(constructRuleChainConnection(ruleChainConnectionInfo, removedNodeIndexes)); + } + } + return result; + } + + private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo, NavigableSet removedNodeIndexes) throws JsonProcessingException { + int fromIndex = ruleChainConnectionInfo.getFromIndex(); + // decrease index because of removed nodes + for (Integer removedIndex : removedNodeIndexes) { + if (fromIndex > removedIndex) { + fromIndex = fromIndex - 1; + } + } + ObjectNode additionalInfo = (ObjectNode) ruleChainConnectionInfo.getAdditionalInfo(); + if (additionalInfo.get("ruleChainNodeId") == null) { + additionalInfo.put("ruleChainNodeId", "rule-chain-node-UNDEFINED"); + } return RuleChainConnectionInfoProto.newBuilder() - .setFromIndex(ruleChainConnectionInfo.getFromIndex()) + .setFromIndex(fromIndex) .setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits()) .setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits()) .setType(ruleChainConnectionInfo.getType()) - .setAdditionalInfo(objectMapper.writeValueAsString(ruleChainConnectionInfo.getAdditionalInfo())) + .setAdditionalInfo(objectMapper.writeValueAsString(additionalInfo)) .build(); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RuleChainEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RuleChainEdgeProcessor.java index 5ead8c5623..eb32340e87 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RuleChainEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RuleChainEdgeProcessor.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; +import org.thingsboard.server.gen.edge.v1.EdgeVersion; import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg; import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; @@ -63,14 +64,14 @@ public class RuleChainEdgeProcessor extends BaseEdgeProcessor { return downlinkMsg; } - public DownlinkMsg processRuleChainMetadataToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType) { + public DownlinkMsg processRuleChainMetadataToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeVersion edgeVersion) { RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId()); RuleChain ruleChain = ruleChainService.findRuleChainById(edgeEvent.getTenantId(), ruleChainId); DownlinkMsg downlinkMsg = null; if (ruleChain != null) { RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edgeEvent.getTenantId(), ruleChainId); RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = - ruleChainMsgConstructor.constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData); + ruleChainMsgConstructor.constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData, edgeVersion); if (ruleChainMetadataUpdateMsg != null) { downlinkMsg = DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructorTest.java new file mode 100644 index 0000000000..4e8a21a8fa --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructorTest.java @@ -0,0 +1,358 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.constructor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.rule.NodeConnectionInfo; +import org.thingsboard.server.common.data.rule.RuleChainMetaData; +import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.gen.edge.v1.EdgeVersion; +import org.thingsboard.server.gen.edge.v1.RuleChainConnectionInfoProto; +import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +@Slf4j +@RunWith(MockitoJUnitRunner.class) +public class RuleChainMsgConstructorTest { + + private static final ObjectMapper mapper = new ObjectMapper(); + + @Test + public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_3() throws JsonProcessingException { + RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID()); + RuleChainMsgConstructor constructor = new RuleChainMsgConstructor(); + RuleChainMetaData ruleChainMetaData = createRuleChainMetaData(ruleChainId, 3, createRuleNodes(ruleChainId), createConnections()); + RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = + constructor.constructRuleChainMetadataUpdatedMsg(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetaData, EdgeVersion.V_3_3_3); + + Assert.assertEquals("First rule node index incorrect!", 3, ruleChainMetadataUpdateMsg.getFirstNodeIndex()); + Assert.assertEquals("Nodes count incorrect!", 12, ruleChainMetadataUpdateMsg.getNodesCount()); + Assert.assertEquals("Connections count incorrect!", 13, ruleChainMetadataUpdateMsg.getConnectionsCount()); + Assert.assertEquals("Rule chain connections count incorrect!", 0, ruleChainMetadataUpdateMsg.getRuleChainConnectionsCount()); + + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(3, 6, "Success"), ruleChainMetadataUpdateMsg.getConnections(0)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(3, 10, "Success"), ruleChainMetadataUpdateMsg.getConnections(1)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(3, 0, "Success"), ruleChainMetadataUpdateMsg.getConnections(2)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 11, "Success"), ruleChainMetadataUpdateMsg.getConnections(3)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 11, "Success"), ruleChainMetadataUpdateMsg.getConnections(4)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 11, "Attributes Updated"), ruleChainMetadataUpdateMsg.getConnections(5)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 7, "RPC Request from Device"), ruleChainMetadataUpdateMsg.getConnections(6)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 4, "Post telemetry"), ruleChainMetadataUpdateMsg.getConnections(7)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 5, "Post attributes"), ruleChainMetadataUpdateMsg.getConnections(8)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 8, "Other"), ruleChainMetadataUpdateMsg.getConnections(9)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 9, "RPC Request to Device"), ruleChainMetadataUpdateMsg.getConnections(10)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(7, 11, "Success"), ruleChainMetadataUpdateMsg.getConnections(11)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(10, 9, "RPC"), ruleChainMetadataUpdateMsg.getConnections(12)); + } + + @Test + public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0() throws JsonProcessingException { + RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID()); + RuleChainMsgConstructor constructor = new RuleChainMsgConstructor(); + RuleChainMetaData ruleChainMetaData = createRuleChainMetaData(ruleChainId, 3, createRuleNodes(ruleChainId), createConnections()); + RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = + constructor.constructRuleChainMetadataUpdatedMsg(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetaData, EdgeVersion.V_3_3_0); + + Assert.assertEquals("First rule node index incorrect!", 2, ruleChainMetadataUpdateMsg.getFirstNodeIndex()); + Assert.assertEquals("Nodes count incorrect!", 10, ruleChainMetadataUpdateMsg.getNodesCount()); + Assert.assertEquals("Connections count incorrect!", 10, ruleChainMetadataUpdateMsg.getConnectionsCount()); + Assert.assertEquals("Rule chain connections count incorrect!", 1, ruleChainMetadataUpdateMsg.getRuleChainConnectionsCount()); + + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(2, 5, "Success"), ruleChainMetadataUpdateMsg.getConnections(0)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(3, 9, "Success"), ruleChainMetadataUpdateMsg.getConnections(1)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 9, "Success"), ruleChainMetadataUpdateMsg.getConnections(2)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 9, "Attributes Updated"), ruleChainMetadataUpdateMsg.getConnections(3)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 6, "RPC Request from Device"), ruleChainMetadataUpdateMsg.getConnections(4)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 3, "Post telemetry"), ruleChainMetadataUpdateMsg.getConnections(5)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 4, "Post attributes"), ruleChainMetadataUpdateMsg.getConnections(6)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 7, "Other"), ruleChainMetadataUpdateMsg.getConnections(7)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 8, "RPC Request to Device"), ruleChainMetadataUpdateMsg.getConnections(8)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 9, "Success"), ruleChainMetadataUpdateMsg.getConnections(9)); + + RuleChainConnectionInfoProto ruleChainConnection = ruleChainMetadataUpdateMsg.getRuleChainConnections(0); + Assert.assertEquals("From index incorrect!", 2, ruleChainConnection.getFromIndex()); + Assert.assertEquals("Type index incorrect!", "Success", ruleChainConnection.getType()); + Assert.assertEquals("Additional info incorrect!", + "{\"description\":\"\",\"layoutX\":477,\"layoutY\":560,\"ruleChainNodeId\":\"rule-chain-node-UNDEFINED\"}", + ruleChainConnection.getAdditionalInfo()); + Assert.assertTrue("Target rule chain id MSB incorrect!", ruleChainConnection.getTargetRuleChainIdMSB() != 0); + Assert.assertTrue("Target rule chain id LSB incorrect!", ruleChainConnection.getTargetRuleChainIdLSB() != 0); + } + + @Test + public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0_inDifferentOrder() throws JsonProcessingException { + // same rule chain metadata, but different order of rule nodes + RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID()); + RuleChainMsgConstructor constructor = new RuleChainMsgConstructor(); + RuleChainMetaData ruleChainMetaData1 = createRuleChainMetaData(ruleChainId, 8, createRuleNodesInDifferentOrder(ruleChainId), createConnectionsInDifferentOrder()); + RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = + constructor.constructRuleChainMetadataUpdatedMsg(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetaData1, EdgeVersion.V_3_3_0); + + Assert.assertEquals("First rule node index incorrect!", 7, ruleChainMetadataUpdateMsg.getFirstNodeIndex()); + Assert.assertEquals("Nodes count incorrect!", 10, ruleChainMetadataUpdateMsg.getNodesCount()); + Assert.assertEquals("Connections count incorrect!", 10, ruleChainMetadataUpdateMsg.getConnectionsCount()); + Assert.assertEquals("Rule chain connections count incorrect!", 1, ruleChainMetadataUpdateMsg.getRuleChainConnectionsCount()); + + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(3, 0, "Success"), ruleChainMetadataUpdateMsg.getConnections(0)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 0, "Attributes Updated"), ruleChainMetadataUpdateMsg.getConnections(1)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 3, "RPC Request from Device"), ruleChainMetadataUpdateMsg.getConnections(2)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 6, "Post telemetry"), ruleChainMetadataUpdateMsg.getConnections(3)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 5, "Post attributes"), ruleChainMetadataUpdateMsg.getConnections(4)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 2, "Other"), ruleChainMetadataUpdateMsg.getConnections(5)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 1, "RPC Request to Device"), ruleChainMetadataUpdateMsg.getConnections(6)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 0, "Success"), ruleChainMetadataUpdateMsg.getConnections(7)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 0, "Success"), ruleChainMetadataUpdateMsg.getConnections(8)); + compareNodeConnectionInfoAndProto(createNodeConnectionInfo(7, 4, "Success"), ruleChainMetadataUpdateMsg.getConnections(9)); + + RuleChainConnectionInfoProto ruleChainConnection = ruleChainMetadataUpdateMsg.getRuleChainConnections(0); + Assert.assertEquals("From index incorrect!", 7, ruleChainConnection.getFromIndex()); + Assert.assertEquals("Type index incorrect!", "Success", ruleChainConnection.getType()); + Assert.assertEquals("Additional info incorrect!", + "{\"description\":\"\",\"layoutX\":477,\"layoutY\":560,\"ruleChainNodeId\":\"rule-chain-node-UNDEFINED\"}", + ruleChainConnection.getAdditionalInfo()); + Assert.assertTrue("Target rule chain id MSB incorrect!", ruleChainConnection.getTargetRuleChainIdMSB() != 0); + Assert.assertTrue("Target rule chain id LSB incorrect!", ruleChainConnection.getTargetRuleChainIdLSB() != 0); + } + + private void compareNodeConnectionInfoAndProto(NodeConnectionInfo expected, org.thingsboard.server.gen.edge.v1.NodeConnectionInfoProto actual) { + Assert.assertEquals(expected.getFromIndex(), actual.getFromIndex()); + Assert.assertEquals(expected.getToIndex(), actual.getToIndex()); + Assert.assertEquals(expected.getType(), actual.getType()); + } + + private RuleChainMetaData createRuleChainMetaData(RuleChainId ruleChainId, Integer firstNodeIndex, List nodes, List connections) { + RuleChainMetaData ruleChainMetaData = new RuleChainMetaData(); + ruleChainMetaData.setRuleChainId(ruleChainId); + ruleChainMetaData.setFirstNodeIndex(firstNodeIndex); + ruleChainMetaData.setNodes(nodes); + ruleChainMetaData.setConnections(connections); + ruleChainMetaData.setRuleChainConnections(null); + return ruleChainMetaData; + } + + private List createConnections() { + List result = new ArrayList<>(); + result.add(createNodeConnectionInfo(3, 6, "Success")); + result.add(createNodeConnectionInfo(3, 10, "Success")); + result.add(createNodeConnectionInfo(3, 0, "Success")); + result.add(createNodeConnectionInfo(4, 11, "Success")); + result.add(createNodeConnectionInfo(5, 11, "Success")); + result.add(createNodeConnectionInfo(6, 11, "Attributes Updated")); + result.add(createNodeConnectionInfo(6, 7, "RPC Request from Device")); + result.add(createNodeConnectionInfo(6, 4, "Post telemetry")); + result.add(createNodeConnectionInfo(6, 5, "Post attributes")); + result.add(createNodeConnectionInfo(6, 8, "Other")); + result.add(createNodeConnectionInfo(6, 9, "RPC Request to Device")); + result.add(createNodeConnectionInfo(7, 11, "Success")); + result.add(createNodeConnectionInfo(10, 9, "RPC")); + return result; + } + + private NodeConnectionInfo createNodeConnectionInfo(int fromIndex, int toIndex, String type) { + NodeConnectionInfo result = new NodeConnectionInfo(); + result.setFromIndex(fromIndex); + result.setToIndex(toIndex); + result.setType(type); + return result; + } + + private List createRuleNodes(RuleChainId ruleChainId) throws JsonProcessingException { + List result = new ArrayList<>(); + result.add(getOutputNode(ruleChainId)); + result.add(getAcknowledgeNode(ruleChainId)); + result.add(getCheckpointNode(ruleChainId)); + result.add(getDeviceProfileNode(ruleChainId)); + result.add(getSaveTimeSeriesNode(ruleChainId)); + result.add(getSaveClientAttributesNode(ruleChainId)); + result.add(getMessageTypeSwitchNode(ruleChainId)); + result.add(getLogRpcFromDeviceNode(ruleChainId)); + result.add(getLogOtherNode(ruleChainId)); + result.add(getRpcCallRequestNode(ruleChainId)); + result.add(getPushToAnalyticsNode(ruleChainId)); + result.add(getPushToCloudNode(ruleChainId)); + return result; + } + + private RuleNode createRuleNode(RuleChainId ruleChainId, String type, String name, JsonNode configuration, JsonNode additionalInfo) { + RuleNode e = new RuleNode(); + e.setRuleChainId(ruleChainId); + e.setType(type); + e.setName(name); + e.setDebugMode(false); + e.setConfiguration(configuration); + e.setAdditionalInfo(additionalInfo); + e.setId(new RuleNodeId(UUID.randomUUID())); + return e; + } + + private List createConnectionsInDifferentOrder() { + List result = new ArrayList<>(); + result.add(createNodeConnectionInfo(0, 2, "RPC")); + result.add(createNodeConnectionInfo(4, 1, "Success")); + result.add(createNodeConnectionInfo(5, 1, "Attributes Updated")); + result.add(createNodeConnectionInfo(5, 4, "RPC Request from Device")); + result.add(createNodeConnectionInfo(5, 7, "Post telemetry")); + result.add(createNodeConnectionInfo(5, 6, "Post attributes")); + result.add(createNodeConnectionInfo(5, 3, "Other")); + result.add(createNodeConnectionInfo(5, 2, "RPC Request to Device")); + result.add(createNodeConnectionInfo(6, 1, "Success")); + result.add(createNodeConnectionInfo(7, 1, "Success")); + result.add(createNodeConnectionInfo(8, 11, "Success")); + result.add(createNodeConnectionInfo(8, 5, "Success")); + result.add(createNodeConnectionInfo(8, 0, "Success")); + return result; + } + + private List createRuleNodesInDifferentOrder(RuleChainId ruleChainId) throws JsonProcessingException { + List result = new ArrayList<>(); + result.add(getPushToAnalyticsNode(ruleChainId)); + result.add(getPushToCloudNode(ruleChainId)); + result.add(getRpcCallRequestNode(ruleChainId)); + result.add(getLogOtherNode(ruleChainId)); + result.add(getLogRpcFromDeviceNode(ruleChainId)); + result.add(getMessageTypeSwitchNode(ruleChainId)); + result.add(getSaveClientAttributesNode(ruleChainId)); + result.add(getSaveTimeSeriesNode(ruleChainId)); + result.add(getDeviceProfileNode(ruleChainId)); + result.add(getCheckpointNode(ruleChainId)); + result.add(getAcknowledgeNode(ruleChainId)); + result.add(getOutputNode(ruleChainId)); + return result; + } + + + @NotNull + private RuleNode getOutputNode(RuleChainId ruleChainId) throws JsonProcessingException { + return createRuleNode(ruleChainId, + "org.thingsboard.rule.engine.flow.TbRuleChainOutputNode", + "Output node", + mapper.readTree("{\"version\":0}"), + mapper.readTree("{\"description\":\"\",\"layoutX\":178,\"layoutY\":592}")); + } + + @NotNull + private RuleNode getCheckpointNode(RuleChainId ruleChainId) throws JsonProcessingException { + return createRuleNode(ruleChainId, + "org.thingsboard.rule.engine.flow.TbCheckpointNode", + "Checkpoint node", + mapper.readTree("{\"queueName\":\"HighPriority\"}"), + mapper.readTree("{\"description\":\"\",\"layoutX\":178,\"layoutY\":647}")); + } + + @NotNull + private RuleNode getSaveTimeSeriesNode(RuleChainId ruleChainId) throws JsonProcessingException { + return createRuleNode(ruleChainId, + "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", + "Save Timeseries", + mapper.readTree("{\"defaultTTL\":0}"), + mapper.readTree("{\"layoutX\":823,\"layoutY\":157}")); + } + + @NotNull + private RuleNode getMessageTypeSwitchNode(RuleChainId ruleChainId) throws JsonProcessingException { + return createRuleNode(ruleChainId, + "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", + "Message Type Switch", + mapper.readTree("{\"version\":0}"), + mapper.readTree("{\"layoutX\":347,\"layoutY\":149}")); + } + + @NotNull + private RuleNode getLogOtherNode(RuleChainId ruleChainId) throws JsonProcessingException { + return createRuleNode(ruleChainId, + "org.thingsboard.rule.engine.action.TbLogNode", + "Log Other", + mapper.readTree("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"), + mapper.readTree("{\"layoutX\":824,\"layoutY\":378}")); + } + + @NotNull + private RuleNode getPushToCloudNode(RuleChainId ruleChainId) throws JsonProcessingException { + return createRuleNode(ruleChainId, + "org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode", + "Push to cloud", + mapper.readTree("{\"scope\":\"SERVER_SCOPE\"}"), + mapper.readTree("{\"layoutX\":1129,\"layoutY\":52}")); + } + + @NotNull + private RuleNode getAcknowledgeNode(RuleChainId ruleChainId) throws JsonProcessingException { + return createRuleNode(ruleChainId, + "org.thingsboard.rule.engine.flow.TbAckNode", + "Acknowledge node", + mapper.readTree("{\"version\":0}"), + mapper.readTree("{\"description\":\"\",\"layoutX\":177,\"layoutY\":703}")); + } + + @NotNull + private RuleNode getDeviceProfileNode(RuleChainId ruleChainId) throws JsonProcessingException { + return createRuleNode(ruleChainId, + "org.thingsboard.rule.engine.profile.TbDeviceProfileNode", + "Device Profile Node", + mapper.readTree("{\"persistAlarmRulesState\":false,\"fetchAlarmRulesStateOnStart\":false}"), + mapper.readTree("{\"description\":\"Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \\\"Success\\\" relation type.\",\"layoutX\":187,\"layoutY\":468}")); + } + + @NotNull + private RuleNode getSaveClientAttributesNode(RuleChainId ruleChainId) throws JsonProcessingException { + return createRuleNode(ruleChainId, + "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode", + "Save Client Attributes", + mapper.readTree("{\"scope\":\"CLIENT_SCOPE\"}"), + mapper.readTree("{\"layoutX\":824,\"layoutY\":52}")); + } + + @NotNull + private RuleNode getLogRpcFromDeviceNode(RuleChainId ruleChainId) throws JsonProcessingException { + return createRuleNode(ruleChainId, + "org.thingsboard.rule.engine.action.TbLogNode", + "Log RPC from Device", + mapper.readTree("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"), + mapper.readTree("{\"layoutX\":825,\"layoutY\":266}")); + } + + @NotNull + private RuleNode getRpcCallRequestNode(RuleChainId ruleChainId) throws JsonProcessingException { + return createRuleNode(ruleChainId, + "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode", + "RPC Call Request", + mapper.readTree("{\"timeoutInSeconds\":60}"), + mapper.readTree("{\"layoutX\":824,\"layoutY\":466}")); + } + + @NotNull + private RuleNode getPushToAnalyticsNode(RuleChainId ruleChainId) throws JsonProcessingException { + return createRuleNode(ruleChainId, + "org.thingsboard.rule.engine.flow.TbRuleChainInputNode", + "Push to Analytics", + mapper.readTree("{\"ruleChainId\":\"af588000-6c7c-11ec-bafd-c9a47a5c8d99\"}"), + mapper.readTree("{\"description\":\"\",\"layoutX\":477,\"layoutY\":560}")); + } +} \ No newline at end of file diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 8586d9b920..e54b5d5765 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -30,6 +30,11 @@ service EdgeRpcService { } +enum EdgeVersion { + V_3_3_0 = 0; + V_3_3_3 = 1; +} + /** * Data Structures; */ @@ -61,6 +66,7 @@ message EdgeUpdateMsg { message ConnectRequestMsg { string edgeRoutingKey = 1; string edgeSecret = 2; + EdgeVersion edgeVersion = 3; } enum ConnectResponseCode {