diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java b/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java index 206b203682..bd351ffecd 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java @@ -76,7 +76,10 @@ public enum TbMsgType { DEVICE_UPDATE_SELF_MSG(null, true), DEDUPLICATION_TIMEOUT_SELF_MSG(null, true), DELAY_TIMEOUT_SELF_MSG(null, true), - MSG_COUNT_SELF_MSG(null, true); + MSG_COUNT_SELF_MSG(null, true), + + // Custom or N/A type: + CUSTOM_OR_NA_TYPE(null, false, true); public static final List NODE_CONNECTIONS = EnumSet.allOf(TbMsgType.class).stream() .filter(tbMsgType -> !tbMsgType.isTellSelfOnly()) @@ -90,26 +93,32 @@ public enum TbMsgType { @Getter private final boolean tellSelfOnly; + @Getter + private final boolean customType; + + TbMsgType(String ruleNodeConnection, boolean tellSelfOnly, boolean customType) { + this.ruleNodeConnection = ruleNodeConnection; + this.tellSelfOnly = tellSelfOnly; + this.customType = customType; + } + TbMsgType(String ruleNodeConnection, boolean tellSelfOnly) { this.ruleNodeConnection = ruleNodeConnection; this.tellSelfOnly = tellSelfOnly; + this.customType = false; } TbMsgType(String ruleNodeConnection) { this.ruleNodeConnection = ruleNodeConnection; this.tellSelfOnly = false; + this.customType = false; } - public static String getRuleNodeConnectionOrElseOther(String msgType) { - if (msgType == null) { + public static String getRuleNodeConnectionOrElseOther(TbMsgType msgType) { + if (msgType == null || msgType.isCustomType() || msgType.isTellSelfOnly()) { return TbNodeConnectionType.OTHER; - } else { - return Arrays.stream(TbMsgType.values()) - .filter(type -> type.name().equals(msgType)) - .findFirst() - .map(TbMsgType::getRuleNodeConnection) - .orElse(TbNodeConnectionType.OTHER); } + return Objects.requireNonNullElse(msgType.getRuleNodeConnection(), TbNodeConnectionType.OTHER); } } diff --git a/common/data/src/test/java/org/thingsboard/server/common/data/msg/TbMsgTypeTest.java b/common/data/src/test/java/org/thingsboard/server/common/data/msg/TbMsgTypeTest.java index a37eb31d72..ff41505266 100644 --- a/common/data/src/test/java/org/thingsboard/server/common/data/msg/TbMsgTypeTest.java +++ b/common/data/src/test/java/org/thingsboard/server/common/data/msg/TbMsgTypeTest.java @@ -22,6 +22,7 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM; import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM_DELETE; +import static org.thingsboard.server.common.data.msg.TbMsgType.CUSTOM_OR_NA_TYPE; import static org.thingsboard.server.common.data.msg.TbMsgType.DEDUPLICATION_TIMEOUT_SELF_MSG; import static org.thingsboard.server.common.data.msg.TbMsgType.DELAY_TIMEOUT_SELF_MSG; import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_ASSIGNED_TO_EDGE; @@ -51,11 +52,12 @@ class TbMsgTypeTest { DEVICE_UPDATE_SELF_MSG, DEDUPLICATION_TIMEOUT_SELF_MSG, DELAY_TIMEOUT_SELF_MSG, - MSG_COUNT_SELF_MSG + MSG_COUNT_SELF_MSG, + CUSTOM_OR_NA_TYPE ); // backward-compatibility tests - + @Test void getRuleNodeConnectionsTest() { var tbMsgTypes = TbMsgType.values(); @@ -75,13 +77,25 @@ class TbMsgTypeTest { var tbMsgTypes = TbMsgType.values(); for (var type : tbMsgTypes) { if (typesWithNullRuleNodeConnection.contains(type)) { - assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(type.name())) + assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(type)) .isEqualTo(TbNodeConnectionType.OTHER); } else { - assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(type.name())).isNotNull() + assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(type)).isNotNull() .isNotEqualTo(TbNodeConnectionType.OTHER); } } } - + + @Test + void getCustomTypeTest() { + var tbMsgTypes = TbMsgType.values(); + for (var type : tbMsgTypes) { + if (type.equals(CUSTOM_OR_NA_TYPE)) { + assertThat(type.isCustomType()).isTrue(); + continue; + } + assertThat(type.isCustomType()).isFalse(); + } + } + } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index a987a4a253..b4f6ccd584 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -52,6 +52,7 @@ public final class TbMsg implements Serializable { private final UUID id; private final long ts; private final String type; + private final TbMsgType internalType; private final EntityId originator; private final CustomerId customerId; private final TbMsgMetaData metaData; @@ -117,7 +118,7 @@ public final class TbMsg implements Serializable { } public static TbMsg newMsg(String queueName, TbMsgType type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type.name(), originator, customerId, + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY); } @@ -126,7 +127,7 @@ public final class TbMsg implements Serializable { } public static TbMsg newMsg(TbMsgType type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { - return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type.name(), originator, customerId, + return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY); } @@ -205,12 +206,12 @@ public final class TbMsg implements Serializable { } public static TbMsg newMsg(String queueName, TbMsgType type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type.name(), originator, customerId, + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY); } public static TbMsg newMsg(TbMsgType type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data) { - return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type.name(), originator, customerId, + return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), dataType, data, null, null, null, TbMsgCallback.EMPTY); } @@ -255,17 +256,17 @@ public final class TbMsg implements Serializable { } public static TbMsg newMsg(TbMsgType type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type.name(), originator, null, + return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY); } public static TbMsg newMsg(TbMsgType type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) { - return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type.name(), originator, null, + return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), TbMsgDataType.JSON, data, null, null, null, callback); } public static TbMsg transformMsg(TbMsg tbMsg, TbMsgType type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, type.name(), originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType, + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType, data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.callback); } @@ -315,6 +316,36 @@ public final class TbMsg implements Serializable { tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ctx.copy(), TbMsgCallback.EMPTY); } + private TbMsg(String queueName, UUID id, long ts, TbMsgType internalType, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data, + RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgProcessingCtx ctx, TbMsgCallback callback) { + this.id = id; + this.queueName = queueName; + if (ts > 0) { + this.ts = ts; + } else { + this.ts = System.currentTimeMillis(); + } + this.internalType = internalType; + this.type = internalType.name(); + this.originator = originator; + if (customerId == null || customerId.isNullUid()) { + if (originator != null && originator.getEntityType() == EntityType.CUSTOMER) { + this.customerId = (CustomerId) originator; + } else { + this.customerId = null; + } + } else { + this.customerId = customerId; + } + this.metaData = metaData; + this.dataType = dataType; + this.data = data; + this.ruleChainId = ruleChainId; + this.ruleNodeId = ruleNodeId; + this.ctx = ctx != null ? ctx : new TbMsgProcessingCtx(); + this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY); + } + private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgProcessingCtx ctx, TbMsgCallback callback) { this.id = id; @@ -325,6 +356,7 @@ public final class TbMsg implements Serializable { this.ts = System.currentTimeMillis(); } this.type = type; + this.internalType = getInternalType(); this.originator = originator; if (customerId == null || customerId.isNullUid()) { if (originator != null && originator.getEntityType() == EntityType.CUSTOMER) { @@ -468,8 +500,19 @@ public final class TbMsg implements Serializable { return ts; } + public TbMsgType getInternalType() { + if (internalType != null) { + return internalType; + } + try { + return TbMsgType.valueOf(type); + } catch (IllegalArgumentException e) { + return TbMsgType.CUSTOM_OR_NA_TYPE; + } + } + public boolean isTypeOf(TbMsgType tbMsgType) { - return tbMsgType != null && tbMsgType.name().equals(this.type); + return tbMsgType != null && tbMsgType.equals(getInternalType()); } public boolean isTypeOneOf(TbMsgType... types) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java index 2121e0c5fa..068d342ea7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java @@ -50,7 +50,7 @@ public class TbMsgTypeSwitchNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - ctx.tellNext(msg, TbMsgType.getRuleNodeConnectionOrElseOther(msg.getType())); + ctx.tellNext(msg, TbMsgType.getRuleNodeConnectionOrElseOther(msg.getInternalType())); } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNodeTest.java index c4fc8cd76d..7861b2f489 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNodeTest.java @@ -81,9 +81,10 @@ class TbMsgTypeSwitchNodeTest { var msg = resultMsgs.get(i); assertThat(msg).isNotNull(); assertThat(msg.getType()).isNotNull(); + assertThat(msg.getType()).isEqualTo(msg.getInternalType().name()); assertThat(msg).isSameAs(tbMsgList.get(i)); assertThat(resultNodeConnections.get(i)) - .isEqualTo(TbMsgType.getRuleNodeConnectionOrElseOther(msg.getType())); + .isEqualTo(TbMsgType.getRuleNodeConnectionOrElseOther(msg.getInternalType())); } }