Added functionality to support 3.3.0 edge version rule chains

This commit is contained in:
Volodymyr Babak 2022-01-12 15:42:43 +02:00
parent 8ec7fd5de8
commit 949d306560
5 changed files with 546 additions and 25 deletions

View File

@ -52,6 +52,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkResponseMsg;
import org.thingsboard.server.gen.edge.v1.EdgeConfiguration;
import org.thingsboard.server.gen.edge.v1.EdgeUpdateMsg;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.EntityDataProto;
import org.thingsboard.server.gen.edge.v1.EntityViewsRequestMsg;
import org.thingsboard.server.gen.edge.v1.RelationRequestMsg;
@ -73,14 +74,11 @@ import org.thingsboard.server.service.edge.rpc.fetch.GeneralEdgeEventFetcher;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
@ -109,6 +107,8 @@ public final class EdgeGrpcSession implements Closeable {
private boolean connected;
private boolean syncCompleted;
private EdgeVersion edgeVersion;
private ScheduledExecutorService sendDownlinkExecutorService;
EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
@ -525,7 +525,7 @@ public final class EdgeGrpcSession implements Closeable {
case RULE_CHAIN:
return ctx.getRuleChainProcessor().processRuleChainToEdge(edge, edgeEvent, msgType, action);
case RULE_CHAIN_METADATA:
return ctx.getRuleChainProcessor().processRuleChainMetadataToEdge(edgeEvent, msgType);
return ctx.getRuleChainProcessor().processRuleChainMetadataToEdge(edgeEvent, msgType, this.edgeVersion);
case ALARM:
return ctx.getAlarmProcessor().processAlarmToEdge(edge, edgeEvent, msgType, action);
case USER:
@ -655,6 +655,7 @@ public final class EdgeGrpcSession implements Closeable {
try {
if (edge.getSecret().equals(request.getEdgeSecret())) {
sessionOpenListener.accept(edge.getId(), this);
this.edgeVersion = request.getEdgeVersion();
return ConnectResponseMsg.newBuilder()
.setResponseCode(ConnectResponseCode.ACCEPTED)
.setErrorMsg("")

View File

@ -17,15 +17,18 @@ package org.thingsboard.server.service.edge.rpc.constructor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.flow.TbRuleChainInputNodeConfiguration;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.NodeConnectionInfoProto;
import org.thingsboard.server.gen.edge.v1.RuleChainConnectionInfoProto;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg;
@ -36,6 +39,10 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.stream.Collectors;
@Component
@Slf4j
@ -43,6 +50,8 @@ import java.util.List;
public class RuleChainMsgConstructor {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String RULE_CHAIN_INPUT_NODE = "org.thingsboard.rule.engine.flow.TbRuleChainInputNode";
private static final String TB_RULE_CHAIN_OUTPUT_NODE = "org.thingsboard.rule.engine.flow.TbRuleChainOutputNode";
public RuleChainUpdateMsg constructRuleChainUpdatedMsg(RuleChainId edgeRootRuleChainId, UpdateMsgType msgType, RuleChain ruleChain) {
RuleChainUpdateMsg.Builder builder = RuleChainUpdateMsg.newBuilder()
@ -60,18 +69,19 @@ public class RuleChainMsgConstructor {
return builder.build();
}
public RuleChainMetadataUpdateMsg constructRuleChainMetadataUpdatedMsg(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) {
public RuleChainMetadataUpdateMsg constructRuleChainMetadataUpdatedMsg(UpdateMsgType msgType,
RuleChainMetaData ruleChainMetaData,
EdgeVersion edgeVersion) {
try {
RuleChainMetadataUpdateMsg.Builder builder = RuleChainMetadataUpdateMsg.newBuilder()
.setRuleChainIdMSB(ruleChainMetaData.getRuleChainId().getId().getMostSignificantBits())
.setRuleChainIdLSB(ruleChainMetaData.getRuleChainId().getId().getLeastSignificantBits())
.addAllNodes(constructNodes(ruleChainMetaData.getNodes()))
.addAllConnections(constructConnections(ruleChainMetaData.getConnections()))
.addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections()));
if (ruleChainMetaData.getFirstNodeIndex() != null) {
builder.setFirstNodeIndex(ruleChainMetaData.getFirstNodeIndex());
} else {
builder.setFirstNodeIndex(-1);
RuleChainMetadataUpdateMsg.Builder builder = RuleChainMetadataUpdateMsg.newBuilder();
switch (edgeVersion) {
case V_3_3_0:
constructRuleChainMetadataUpdatedMsg_V_3_3_0(builder, ruleChainMetaData);
break;
case V_3_3_3:
default:
constructRuleChainMetadataUpdatedMsg_V_3_3_3(builder, ruleChainMetaData);
break;
}
builder.setMsgType(msgType);
return builder.build();
@ -81,6 +91,104 @@ public class RuleChainMsgConstructor {
return null;
}
private void constructRuleChainMetadataUpdatedMsg_V_3_3_3(RuleChainMetadataUpdateMsg.Builder builder,
RuleChainMetaData ruleChainMetaData) throws JsonProcessingException {
builder.setRuleChainIdMSB(ruleChainMetaData.getRuleChainId().getId().getMostSignificantBits())
.setRuleChainIdLSB(ruleChainMetaData.getRuleChainId().getId().getLeastSignificantBits())
.addAllNodes(constructNodes(ruleChainMetaData.getNodes()))
.addAllConnections(constructConnections(ruleChainMetaData.getConnections()))
.addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections(), new TreeSet<>()));
if (ruleChainMetaData.getFirstNodeIndex() != null) {
builder.setFirstNodeIndex(ruleChainMetaData.getFirstNodeIndex());
} else {
builder.setFirstNodeIndex(-1);
}
}
private void constructRuleChainMetadataUpdatedMsg_V_3_3_0(RuleChainMetadataUpdateMsg.Builder builder,
RuleChainMetaData ruleChainMetaData) throws JsonProcessingException {
List<RuleNode> supportedNodes = filterNodes_V_3_3_0(ruleChainMetaData.getNodes());
NavigableSet<Integer> removedNodeIndexes = getRemovedNodeIndexes(ruleChainMetaData.getNodes(), ruleChainMetaData.getConnections());
List<NodeConnectionInfo> connections = filterConnections_V_3_3_0(ruleChainMetaData.getNodes(), ruleChainMetaData.getConnections(), removedNodeIndexes);
List<RuleChainConnectionInfo> ruleChainConnections = new ArrayList<>();
if (ruleChainMetaData.getRuleChainConnections() != null) {
ruleChainConnections.addAll(ruleChainMetaData.getRuleChainConnections());
}
ruleChainConnections.addAll(addRuleChainConnections_V_3_3_0(ruleChainMetaData.getNodes(), ruleChainMetaData.getConnections()));
builder.setRuleChainIdMSB(ruleChainMetaData.getRuleChainId().getId().getMostSignificantBits())
.setRuleChainIdLSB(ruleChainMetaData.getRuleChainId().getId().getLeastSignificantBits())
.addAllNodes(constructNodes(supportedNodes))
.addAllConnections(constructConnections(connections))
.addAllRuleChainConnections(constructRuleChainConnections(ruleChainConnections, removedNodeIndexes));
if (ruleChainMetaData.getFirstNodeIndex() != null) {
Integer firstNodeIndex = ruleChainMetaData.getFirstNodeIndex();
// decrease index because of removed nodes
for (Integer removedIndex : removedNodeIndexes) {
if (firstNodeIndex > removedIndex) {
firstNodeIndex = firstNodeIndex - 1;
}
}
builder.setFirstNodeIndex(firstNodeIndex);
} else {
builder.setFirstNodeIndex(-1);
}
}
private List<NodeConnectionInfo> filterConnections_V_3_3_0(List<RuleNode> nodes, List<NodeConnectionInfo> connections, NavigableSet<Integer> removedNodeIndexes) {
List<NodeConnectionInfo> result = new ArrayList<>();
if (connections != null) {
result = connections.stream().filter(conn -> {
for (int i = 0; i < nodes.size(); i++) {
RuleNode node = nodes.get(i);
if (node.getType().equalsIgnoreCase(RULE_CHAIN_INPUT_NODE)
|| node.getType().equalsIgnoreCase(TB_RULE_CHAIN_OUTPUT_NODE)) {
if (conn.getFromIndex() == i || conn.getToIndex() == i) {
return false;
}
}
}
return true;
}).map(conn -> {
NodeConnectionInfo newConn = new NodeConnectionInfo();
newConn.setFromIndex(conn.getFromIndex());
newConn.setToIndex(conn.getToIndex());
newConn.setType(conn.getType());
return newConn;
}).collect(Collectors.toList());
}
// decrease index because of removed nodes
for (Integer removedIndex : removedNodeIndexes) {
for (NodeConnectionInfo newConn : result) {
if (newConn.getToIndex() > removedIndex) {
newConn.setToIndex(newConn.getToIndex() - 1);
}
if (newConn.getFromIndex() > removedIndex) {
newConn.setFromIndex(newConn.getFromIndex() - 1);
}
}
}
return result;
}
private NavigableSet<Integer> getRemovedNodeIndexes(List<RuleNode> nodes, List<NodeConnectionInfo> connections) {
TreeSet<Integer> removedIndexes = new TreeSet<>();
for (NodeConnectionInfo connection : connections) {
for (int i = 0; i < nodes.size(); i++) {
RuleNode node = nodes.get(i);
if (node.getType().equalsIgnoreCase(RULE_CHAIN_INPUT_NODE)
|| node.getType().equalsIgnoreCase(TB_RULE_CHAIN_OUTPUT_NODE)) {
if (connection.getFromIndex() == i || connection.getToIndex() == i) {
removedIndexes.add(i);
}
}
}
}
return removedIndexes.descendingSet();
}
private List<NodeConnectionInfoProto> constructConnections(List<NodeConnectionInfo> connections) {
List<NodeConnectionInfoProto> result = new ArrayList<>();
if (connections != null && !connections.isEmpty()) {
@ -99,6 +207,21 @@ public class RuleChainMsgConstructor {
.build();
}
private List<RuleNode> filterNodes_V_3_3_0(List<RuleNode> nodes) {
List<RuleNode> result = new ArrayList<>();
for (RuleNode node : nodes) {
switch (node.getType()) {
case RULE_CHAIN_INPUT_NODE:
case TB_RULE_CHAIN_OUTPUT_NODE:
log.trace("Skipping not supported rule node {}", node);
break;
default:
result.add(node);
}
}
return result;
}
private List<RuleNodeProto> constructNodes(List<RuleNode> nodes) throws JsonProcessingException {
List<RuleNodeProto> result = new ArrayList<>();
if (nodes != null && !nodes.isEmpty()) {
@ -109,23 +232,55 @@ public class RuleChainMsgConstructor {
return result;
}
private List<RuleChainConnectionInfoProto> constructRuleChainConnections(List<RuleChainConnectionInfo> ruleChainConnections) throws JsonProcessingException {
List<RuleChainConnectionInfoProto> result = new ArrayList<>();
if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) {
for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) {
result.add(constructRuleChainConnection(ruleChainConnectionInfo));
private List<RuleChainConnectionInfo> addRuleChainConnections_V_3_3_0(List<RuleNode> nodes, List<NodeConnectionInfo> connections) throws JsonProcessingException {
List<RuleChainConnectionInfo> result = new ArrayList<>();
for (int i = 0; i < nodes.size(); i++) {
RuleNode node = nodes.get(i);
if (node.getType().equalsIgnoreCase(RULE_CHAIN_INPUT_NODE)) {
for (NodeConnectionInfo connection : connections) {
if (connection.getToIndex() == i) {
RuleChainConnectionInfo e = new RuleChainConnectionInfo();
e.setFromIndex(connection.getFromIndex());
TbRuleChainInputNodeConfiguration configuration = JacksonUtil.treeToValue(node.getConfiguration(), TbRuleChainInputNodeConfiguration.class);
e.setTargetRuleChainId(new RuleChainId(UUID.fromString(configuration.getRuleChainId())));
e.setAdditionalInfo(node.getAdditionalInfo());
e.setType(connection.getType());
result.add(e);
}
}
}
}
return result;
}
private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) throws JsonProcessingException {
private List<RuleChainConnectionInfoProto> constructRuleChainConnections(List<RuleChainConnectionInfo> ruleChainConnections, NavigableSet<Integer> removedNodeIndexes) throws JsonProcessingException {
List<RuleChainConnectionInfoProto> result = new ArrayList<>();
if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) {
for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) {
result.add(constructRuleChainConnection(ruleChainConnectionInfo, removedNodeIndexes));
}
}
return result;
}
private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo, NavigableSet<Integer> removedNodeIndexes) throws JsonProcessingException {
int fromIndex = ruleChainConnectionInfo.getFromIndex();
// decrease index because of removed nodes
for (Integer removedIndex : removedNodeIndexes) {
if (fromIndex > removedIndex) {
fromIndex = fromIndex - 1;
}
}
ObjectNode additionalInfo = (ObjectNode) ruleChainConnectionInfo.getAdditionalInfo();
if (additionalInfo.get("ruleChainNodeId") == null) {
additionalInfo.put("ruleChainNodeId", "rule-chain-node-UNDEFINED");
}
return RuleChainConnectionInfoProto.newBuilder()
.setFromIndex(ruleChainConnectionInfo.getFromIndex())
.setFromIndex(fromIndex)
.setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits())
.setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits())
.setType(ruleChainConnectionInfo.getType())
.setAdditionalInfo(objectMapper.writeValueAsString(ruleChainConnectionInfo.getAdditionalInfo()))
.setAdditionalInfo(objectMapper.writeValueAsString(additionalInfo))
.build();
}

View File

@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
@ -63,14 +64,14 @@ public class RuleChainEdgeProcessor extends BaseEdgeProcessor {
return downlinkMsg;
}
public DownlinkMsg processRuleChainMetadataToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType) {
public DownlinkMsg processRuleChainMetadataToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeVersion edgeVersion) {
RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId());
RuleChain ruleChain = ruleChainService.findRuleChainById(edgeEvent.getTenantId(), ruleChainId);
DownlinkMsg downlinkMsg = null;
if (ruleChain != null) {
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edgeEvent.getTenantId(), ruleChainId);
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
ruleChainMsgConstructor.constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData);
ruleChainMsgConstructor.constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData, edgeVersion);
if (ruleChainMetadataUpdateMsg != null) {
downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())

View File

@ -0,0 +1,358 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.constructor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.RuleChainConnectionInfoProto;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@Slf4j
@RunWith(MockitoJUnitRunner.class)
public class RuleChainMsgConstructorTest {
private static final ObjectMapper mapper = new ObjectMapper();
@Test
public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_3() throws JsonProcessingException {
RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID());
RuleChainMsgConstructor constructor = new RuleChainMsgConstructor();
RuleChainMetaData ruleChainMetaData = createRuleChainMetaData(ruleChainId, 3, createRuleNodes(ruleChainId), createConnections());
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
constructor.constructRuleChainMetadataUpdatedMsg(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetaData, EdgeVersion.V_3_3_3);
Assert.assertEquals("First rule node index incorrect!", 3, ruleChainMetadataUpdateMsg.getFirstNodeIndex());
Assert.assertEquals("Nodes count incorrect!", 12, ruleChainMetadataUpdateMsg.getNodesCount());
Assert.assertEquals("Connections count incorrect!", 13, ruleChainMetadataUpdateMsg.getConnectionsCount());
Assert.assertEquals("Rule chain connections count incorrect!", 0, ruleChainMetadataUpdateMsg.getRuleChainConnectionsCount());
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(3, 6, "Success"), ruleChainMetadataUpdateMsg.getConnections(0));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(3, 10, "Success"), ruleChainMetadataUpdateMsg.getConnections(1));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(3, 0, "Success"), ruleChainMetadataUpdateMsg.getConnections(2));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 11, "Success"), ruleChainMetadataUpdateMsg.getConnections(3));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 11, "Success"), ruleChainMetadataUpdateMsg.getConnections(4));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 11, "Attributes Updated"), ruleChainMetadataUpdateMsg.getConnections(5));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 7, "RPC Request from Device"), ruleChainMetadataUpdateMsg.getConnections(6));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 4, "Post telemetry"), ruleChainMetadataUpdateMsg.getConnections(7));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 5, "Post attributes"), ruleChainMetadataUpdateMsg.getConnections(8));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 8, "Other"), ruleChainMetadataUpdateMsg.getConnections(9));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 9, "RPC Request to Device"), ruleChainMetadataUpdateMsg.getConnections(10));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(7, 11, "Success"), ruleChainMetadataUpdateMsg.getConnections(11));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(10, 9, "RPC"), ruleChainMetadataUpdateMsg.getConnections(12));
}
@Test
public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0() throws JsonProcessingException {
RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID());
RuleChainMsgConstructor constructor = new RuleChainMsgConstructor();
RuleChainMetaData ruleChainMetaData = createRuleChainMetaData(ruleChainId, 3, createRuleNodes(ruleChainId), createConnections());
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
constructor.constructRuleChainMetadataUpdatedMsg(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetaData, EdgeVersion.V_3_3_0);
Assert.assertEquals("First rule node index incorrect!", 2, ruleChainMetadataUpdateMsg.getFirstNodeIndex());
Assert.assertEquals("Nodes count incorrect!", 10, ruleChainMetadataUpdateMsg.getNodesCount());
Assert.assertEquals("Connections count incorrect!", 10, ruleChainMetadataUpdateMsg.getConnectionsCount());
Assert.assertEquals("Rule chain connections count incorrect!", 1, ruleChainMetadataUpdateMsg.getRuleChainConnectionsCount());
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(2, 5, "Success"), ruleChainMetadataUpdateMsg.getConnections(0));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(3, 9, "Success"), ruleChainMetadataUpdateMsg.getConnections(1));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 9, "Success"), ruleChainMetadataUpdateMsg.getConnections(2));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 9, "Attributes Updated"), ruleChainMetadataUpdateMsg.getConnections(3));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 6, "RPC Request from Device"), ruleChainMetadataUpdateMsg.getConnections(4));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 3, "Post telemetry"), ruleChainMetadataUpdateMsg.getConnections(5));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 4, "Post attributes"), ruleChainMetadataUpdateMsg.getConnections(6));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 7, "Other"), ruleChainMetadataUpdateMsg.getConnections(7));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 8, "RPC Request to Device"), ruleChainMetadataUpdateMsg.getConnections(8));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 9, "Success"), ruleChainMetadataUpdateMsg.getConnections(9));
RuleChainConnectionInfoProto ruleChainConnection = ruleChainMetadataUpdateMsg.getRuleChainConnections(0);
Assert.assertEquals("From index incorrect!", 2, ruleChainConnection.getFromIndex());
Assert.assertEquals("Type index incorrect!", "Success", ruleChainConnection.getType());
Assert.assertEquals("Additional info incorrect!",
"{\"description\":\"\",\"layoutX\":477,\"layoutY\":560,\"ruleChainNodeId\":\"rule-chain-node-UNDEFINED\"}",
ruleChainConnection.getAdditionalInfo());
Assert.assertTrue("Target rule chain id MSB incorrect!", ruleChainConnection.getTargetRuleChainIdMSB() != 0);
Assert.assertTrue("Target rule chain id LSB incorrect!", ruleChainConnection.getTargetRuleChainIdLSB() != 0);
}
@Test
public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0_inDifferentOrder() throws JsonProcessingException {
// same rule chain metadata, but different order of rule nodes
RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID());
RuleChainMsgConstructor constructor = new RuleChainMsgConstructor();
RuleChainMetaData ruleChainMetaData1 = createRuleChainMetaData(ruleChainId, 8, createRuleNodesInDifferentOrder(ruleChainId), createConnectionsInDifferentOrder());
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
constructor.constructRuleChainMetadataUpdatedMsg(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetaData1, EdgeVersion.V_3_3_0);
Assert.assertEquals("First rule node index incorrect!", 7, ruleChainMetadataUpdateMsg.getFirstNodeIndex());
Assert.assertEquals("Nodes count incorrect!", 10, ruleChainMetadataUpdateMsg.getNodesCount());
Assert.assertEquals("Connections count incorrect!", 10, ruleChainMetadataUpdateMsg.getConnectionsCount());
Assert.assertEquals("Rule chain connections count incorrect!", 1, ruleChainMetadataUpdateMsg.getRuleChainConnectionsCount());
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(3, 0, "Success"), ruleChainMetadataUpdateMsg.getConnections(0));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 0, "Attributes Updated"), ruleChainMetadataUpdateMsg.getConnections(1));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 3, "RPC Request from Device"), ruleChainMetadataUpdateMsg.getConnections(2));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 6, "Post telemetry"), ruleChainMetadataUpdateMsg.getConnections(3));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 5, "Post attributes"), ruleChainMetadataUpdateMsg.getConnections(4));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 2, "Other"), ruleChainMetadataUpdateMsg.getConnections(5));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(4, 1, "RPC Request to Device"), ruleChainMetadataUpdateMsg.getConnections(6));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(5, 0, "Success"), ruleChainMetadataUpdateMsg.getConnections(7));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(6, 0, "Success"), ruleChainMetadataUpdateMsg.getConnections(8));
compareNodeConnectionInfoAndProto(createNodeConnectionInfo(7, 4, "Success"), ruleChainMetadataUpdateMsg.getConnections(9));
RuleChainConnectionInfoProto ruleChainConnection = ruleChainMetadataUpdateMsg.getRuleChainConnections(0);
Assert.assertEquals("From index incorrect!", 7, ruleChainConnection.getFromIndex());
Assert.assertEquals("Type index incorrect!", "Success", ruleChainConnection.getType());
Assert.assertEquals("Additional info incorrect!",
"{\"description\":\"\",\"layoutX\":477,\"layoutY\":560,\"ruleChainNodeId\":\"rule-chain-node-UNDEFINED\"}",
ruleChainConnection.getAdditionalInfo());
Assert.assertTrue("Target rule chain id MSB incorrect!", ruleChainConnection.getTargetRuleChainIdMSB() != 0);
Assert.assertTrue("Target rule chain id LSB incorrect!", ruleChainConnection.getTargetRuleChainIdLSB() != 0);
}
private void compareNodeConnectionInfoAndProto(NodeConnectionInfo expected, org.thingsboard.server.gen.edge.v1.NodeConnectionInfoProto actual) {
Assert.assertEquals(expected.getFromIndex(), actual.getFromIndex());
Assert.assertEquals(expected.getToIndex(), actual.getToIndex());
Assert.assertEquals(expected.getType(), actual.getType());
}
private RuleChainMetaData createRuleChainMetaData(RuleChainId ruleChainId, Integer firstNodeIndex, List<RuleNode> nodes, List<NodeConnectionInfo> connections) {
RuleChainMetaData ruleChainMetaData = new RuleChainMetaData();
ruleChainMetaData.setRuleChainId(ruleChainId);
ruleChainMetaData.setFirstNodeIndex(firstNodeIndex);
ruleChainMetaData.setNodes(nodes);
ruleChainMetaData.setConnections(connections);
ruleChainMetaData.setRuleChainConnections(null);
return ruleChainMetaData;
}
private List<NodeConnectionInfo> createConnections() {
List<NodeConnectionInfo> result = new ArrayList<>();
result.add(createNodeConnectionInfo(3, 6, "Success"));
result.add(createNodeConnectionInfo(3, 10, "Success"));
result.add(createNodeConnectionInfo(3, 0, "Success"));
result.add(createNodeConnectionInfo(4, 11, "Success"));
result.add(createNodeConnectionInfo(5, 11, "Success"));
result.add(createNodeConnectionInfo(6, 11, "Attributes Updated"));
result.add(createNodeConnectionInfo(6, 7, "RPC Request from Device"));
result.add(createNodeConnectionInfo(6, 4, "Post telemetry"));
result.add(createNodeConnectionInfo(6, 5, "Post attributes"));
result.add(createNodeConnectionInfo(6, 8, "Other"));
result.add(createNodeConnectionInfo(6, 9, "RPC Request to Device"));
result.add(createNodeConnectionInfo(7, 11, "Success"));
result.add(createNodeConnectionInfo(10, 9, "RPC"));
return result;
}
private NodeConnectionInfo createNodeConnectionInfo(int fromIndex, int toIndex, String type) {
NodeConnectionInfo result = new NodeConnectionInfo();
result.setFromIndex(fromIndex);
result.setToIndex(toIndex);
result.setType(type);
return result;
}
private List<RuleNode> createRuleNodes(RuleChainId ruleChainId) throws JsonProcessingException {
List<RuleNode> result = new ArrayList<>();
result.add(getOutputNode(ruleChainId));
result.add(getAcknowledgeNode(ruleChainId));
result.add(getCheckpointNode(ruleChainId));
result.add(getDeviceProfileNode(ruleChainId));
result.add(getSaveTimeSeriesNode(ruleChainId));
result.add(getSaveClientAttributesNode(ruleChainId));
result.add(getMessageTypeSwitchNode(ruleChainId));
result.add(getLogRpcFromDeviceNode(ruleChainId));
result.add(getLogOtherNode(ruleChainId));
result.add(getRpcCallRequestNode(ruleChainId));
result.add(getPushToAnalyticsNode(ruleChainId));
result.add(getPushToCloudNode(ruleChainId));
return result;
}
private RuleNode createRuleNode(RuleChainId ruleChainId, String type, String name, JsonNode configuration, JsonNode additionalInfo) {
RuleNode e = new RuleNode();
e.setRuleChainId(ruleChainId);
e.setType(type);
e.setName(name);
e.setDebugMode(false);
e.setConfiguration(configuration);
e.setAdditionalInfo(additionalInfo);
e.setId(new RuleNodeId(UUID.randomUUID()));
return e;
}
private List<NodeConnectionInfo> createConnectionsInDifferentOrder() {
List<NodeConnectionInfo> result = new ArrayList<>();
result.add(createNodeConnectionInfo(0, 2, "RPC"));
result.add(createNodeConnectionInfo(4, 1, "Success"));
result.add(createNodeConnectionInfo(5, 1, "Attributes Updated"));
result.add(createNodeConnectionInfo(5, 4, "RPC Request from Device"));
result.add(createNodeConnectionInfo(5, 7, "Post telemetry"));
result.add(createNodeConnectionInfo(5, 6, "Post attributes"));
result.add(createNodeConnectionInfo(5, 3, "Other"));
result.add(createNodeConnectionInfo(5, 2, "RPC Request to Device"));
result.add(createNodeConnectionInfo(6, 1, "Success"));
result.add(createNodeConnectionInfo(7, 1, "Success"));
result.add(createNodeConnectionInfo(8, 11, "Success"));
result.add(createNodeConnectionInfo(8, 5, "Success"));
result.add(createNodeConnectionInfo(8, 0, "Success"));
return result;
}
private List<RuleNode> createRuleNodesInDifferentOrder(RuleChainId ruleChainId) throws JsonProcessingException {
List<RuleNode> result = new ArrayList<>();
result.add(getPushToAnalyticsNode(ruleChainId));
result.add(getPushToCloudNode(ruleChainId));
result.add(getRpcCallRequestNode(ruleChainId));
result.add(getLogOtherNode(ruleChainId));
result.add(getLogRpcFromDeviceNode(ruleChainId));
result.add(getMessageTypeSwitchNode(ruleChainId));
result.add(getSaveClientAttributesNode(ruleChainId));
result.add(getSaveTimeSeriesNode(ruleChainId));
result.add(getDeviceProfileNode(ruleChainId));
result.add(getCheckpointNode(ruleChainId));
result.add(getAcknowledgeNode(ruleChainId));
result.add(getOutputNode(ruleChainId));
return result;
}
@NotNull
private RuleNode getOutputNode(RuleChainId ruleChainId) throws JsonProcessingException {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.flow.TbRuleChainOutputNode",
"Output node",
mapper.readTree("{\"version\":0}"),
mapper.readTree("{\"description\":\"\",\"layoutX\":178,\"layoutY\":592}"));
}
@NotNull
private RuleNode getCheckpointNode(RuleChainId ruleChainId) throws JsonProcessingException {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.flow.TbCheckpointNode",
"Checkpoint node",
mapper.readTree("{\"queueName\":\"HighPriority\"}"),
mapper.readTree("{\"description\":\"\",\"layoutX\":178,\"layoutY\":647}"));
}
@NotNull
private RuleNode getSaveTimeSeriesNode(RuleChainId ruleChainId) throws JsonProcessingException {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"Save Timeseries",
mapper.readTree("{\"defaultTTL\":0}"),
mapper.readTree("{\"layoutX\":823,\"layoutY\":157}"));
}
@NotNull
private RuleNode getMessageTypeSwitchNode(RuleChainId ruleChainId) throws JsonProcessingException {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
"Message Type Switch",
mapper.readTree("{\"version\":0}"),
mapper.readTree("{\"layoutX\":347,\"layoutY\":149}"));
}
@NotNull
private RuleNode getLogOtherNode(RuleChainId ruleChainId) throws JsonProcessingException {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.action.TbLogNode",
"Log Other",
mapper.readTree("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"),
mapper.readTree("{\"layoutX\":824,\"layoutY\":378}"));
}
@NotNull
private RuleNode getPushToCloudNode(RuleChainId ruleChainId) throws JsonProcessingException {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode",
"Push to cloud",
mapper.readTree("{\"scope\":\"SERVER_SCOPE\"}"),
mapper.readTree("{\"layoutX\":1129,\"layoutY\":52}"));
}
@NotNull
private RuleNode getAcknowledgeNode(RuleChainId ruleChainId) throws JsonProcessingException {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.flow.TbAckNode",
"Acknowledge node",
mapper.readTree("{\"version\":0}"),
mapper.readTree("{\"description\":\"\",\"layoutX\":177,\"layoutY\":703}"));
}
@NotNull
private RuleNode getDeviceProfileNode(RuleChainId ruleChainId) throws JsonProcessingException {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.profile.TbDeviceProfileNode",
"Device Profile Node",
mapper.readTree("{\"persistAlarmRulesState\":false,\"fetchAlarmRulesStateOnStart\":false}"),
mapper.readTree("{\"description\":\"Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \\\"Success\\\" relation type.\",\"layoutX\":187,\"layoutY\":468}"));
}
@NotNull
private RuleNode getSaveClientAttributesNode(RuleChainId ruleChainId) throws JsonProcessingException {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode",
"Save Client Attributes",
mapper.readTree("{\"scope\":\"CLIENT_SCOPE\"}"),
mapper.readTree("{\"layoutX\":824,\"layoutY\":52}"));
}
@NotNull
private RuleNode getLogRpcFromDeviceNode(RuleChainId ruleChainId) throws JsonProcessingException {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.action.TbLogNode",
"Log RPC from Device",
mapper.readTree("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"),
mapper.readTree("{\"layoutX\":825,\"layoutY\":266}"));
}
@NotNull
private RuleNode getRpcCallRequestNode(RuleChainId ruleChainId) throws JsonProcessingException {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode",
"RPC Call Request",
mapper.readTree("{\"timeoutInSeconds\":60}"),
mapper.readTree("{\"layoutX\":824,\"layoutY\":466}"));
}
@NotNull
private RuleNode getPushToAnalyticsNode(RuleChainId ruleChainId) throws JsonProcessingException {
return createRuleNode(ruleChainId,
"org.thingsboard.rule.engine.flow.TbRuleChainInputNode",
"Push to Analytics",
mapper.readTree("{\"ruleChainId\":\"af588000-6c7c-11ec-bafd-c9a47a5c8d99\"}"),
mapper.readTree("{\"description\":\"\",\"layoutX\":477,\"layoutY\":560}"));
}
}

View File

@ -30,6 +30,11 @@ service EdgeRpcService {
}
enum EdgeVersion {
V_3_3_0 = 0;
V_3_3_3 = 1;
}
/**
* Data Structures;
*/
@ -61,6 +66,7 @@ message EdgeUpdateMsg {
message ConnectRequestMsg {
string edgeRoutingKey = 1;
string edgeSecret = 2;
EdgeVersion edgeVersion = 3;
}
enum ConnectResponseCode {