diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java b/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java index bd351ffecd..f7c9a5b05f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java @@ -16,11 +16,10 @@ package org.thingsboard.server.common.data.msg; import lombok.Getter; +import org.thingsboard.server.common.data.StringUtils; -import java.util.Arrays; import java.util.EnumSet; import java.util.List; -import java.util.Objects; import java.util.stream.Collectors; public enum TbMsgType { @@ -79,12 +78,12 @@ public enum TbMsgType { MSG_COUNT_SELF_MSG(null, true), // Custom or N/A type: - CUSTOM_OR_NA_TYPE(null, false, true); + CUSTOM_OR_NA_TYPE(null, false); public static final List NODE_CONNECTIONS = EnumSet.allOf(TbMsgType.class).stream() .filter(tbMsgType -> !tbMsgType.isTellSelfOnly()) .map(TbMsgType::getRuleNodeConnection) - .filter(Objects::nonNull) + .filter(connection -> !TbNodeConnectionType.OTHER.equals(connection)) .collect(Collectors.toUnmodifiableList()); @Getter @@ -93,32 +92,13 @@ public enum TbMsgType { @Getter private final boolean tellSelfOnly; - @Getter - private final boolean customType; - - TbMsgType(String ruleNodeConnection, boolean tellSelfOnly, boolean customType) { - this.ruleNodeConnection = ruleNodeConnection; - this.tellSelfOnly = tellSelfOnly; - this.customType = customType; - } - TbMsgType(String ruleNodeConnection, boolean tellSelfOnly) { - this.ruleNodeConnection = ruleNodeConnection; + this.ruleNodeConnection = StringUtils.isNotEmpty(ruleNodeConnection) ? ruleNodeConnection : TbNodeConnectionType.OTHER; this.tellSelfOnly = tellSelfOnly; - this.customType = false; } TbMsgType(String ruleNodeConnection) { - this.ruleNodeConnection = ruleNodeConnection; - this.tellSelfOnly = false; - this.customType = false; - } - - public static String getRuleNodeConnectionOrElseOther(TbMsgType msgType) { - if (msgType == null || msgType.isCustomType() || msgType.isTellSelfOnly()) { - return TbNodeConnectionType.OTHER; - } - return Objects.requireNonNullElse(msgType.getRuleNodeConnection(), TbNodeConnectionType.OTHER); + this(ruleNodeConnection, false); } } diff --git a/common/data/src/test/java/org/thingsboard/server/common/data/msg/TbMsgTypeTest.java b/common/data/src/test/java/org/thingsboard/server/common/data/msg/TbMsgTypeTest.java index ff41505266..c77d109814 100644 --- a/common/data/src/test/java/org/thingsboard/server/common/data/msg/TbMsgTypeTest.java +++ b/common/data/src/test/java/org/thingsboard/server/common/data/msg/TbMsgTypeTest.java @@ -63,39 +63,25 @@ class TbMsgTypeTest { var tbMsgTypes = TbMsgType.values(); for (var type : tbMsgTypes) { if (typesWithNullRuleNodeConnection.contains(type)) { - assertThat(type.getRuleNodeConnection()).isNull(); + assertThat(type.getRuleNodeConnection()).isEqualTo(TbNodeConnectionType.OTHER); } else { - assertThat(type.getRuleNodeConnection()).isNotNull(); + assertThat(type.getRuleNodeConnection()).isNotEqualTo(TbNodeConnectionType.OTHER); } } } @Test void getRuleNodeConnectionOrElseOtherTest() { - assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(null)) - .isEqualTo(TbNodeConnectionType.OTHER); var tbMsgTypes = TbMsgType.values(); for (var type : tbMsgTypes) { if (typesWithNullRuleNodeConnection.contains(type)) { - assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(type)) + assertThat(type.getRuleNodeConnection()) .isEqualTo(TbNodeConnectionType.OTHER); } else { - assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(type)).isNotNull() + assertThat(type.getRuleNodeConnection()).isNotNull() .isNotEqualTo(TbNodeConnectionType.OTHER); } } } - @Test - void getCustomTypeTest() { - var tbMsgTypes = TbMsgType.values(); - for (var type : tbMsgTypes) { - if (type.equals(CUSTOM_OR_NA_TYPE)) { - assertThat(type.isCustomType()).isTrue(); - continue; - } - assertThat(type.isCustomType()).isFalse(); - } - } - } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index b4f6ccd584..afd3d0268c 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -98,7 +98,7 @@ public final class TbMsg implements Serializable { */ @Deprecated(since = "3.5.2") public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), null, type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY); } @@ -109,7 +109,7 @@ public final class TbMsg implements Serializable { @Deprecated(since = "3.5.2", forRemoval = true) public static TbMsg newMsg(String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { - return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, + return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), null, type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY); } @@ -171,13 +171,13 @@ public final class TbMsg implements Serializable { */ @Deprecated(since = "3.5.2") public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), null, type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY); } @Deprecated(since = "3.5.2", forRemoval = true) public static TbMsg newMsg(String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data) { - return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, + return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), null, type, originator, customerId, metaData.copy(), dataType, data, null, null, null, TbMsgCallback.EMPTY); } @@ -223,13 +223,13 @@ public final class TbMsg implements Serializable { @Deprecated(since = "3.5.2", forRemoval = true) public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, + return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), null, type, originator, null, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY); } @Deprecated(since = "3.5.2", forRemoval = true) public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) { - return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, + return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), null, type, originator, null, metaData.copy(), TbMsgDataType.JSON, data, null, null, null, callback); } @@ -251,7 +251,7 @@ public final class TbMsg implements Serializable { */ @Deprecated(since = "3.5.2") public static TbMsg transformMsg(TbMsg tbMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType, + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, null, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType, data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.callback); } @@ -271,82 +271,57 @@ public final class TbMsg implements Serializable { } public static TbMsg transformMsgOriginator(TbMsg tbMsg, EntityId originatorId) { - return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, originatorId, tbMsg.getCustomerId(), tbMsg.metaData, tbMsg.dataType, + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, originatorId, tbMsg.getCustomerId(), tbMsg.metaData, tbMsg.dataType, tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback()); } public static TbMsg transformMsgData(TbMsg tbMsg, String data) { - return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback()); } public static TbMsg transformMsgMetadata(TbMsg tbMsg, TbMsgMetaData metadata) { - return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, metadata.copy(), tbMsg.dataType, + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, tbMsg.customerId, metadata.copy(), tbMsg.dataType, tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback()); } public static TbMsg transformMsg(TbMsg tbMsg, TbMsgMetaData metadata, String data) { - return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, metadata, tbMsg.dataType, + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, tbMsg.customerId, metadata, tbMsg.dataType, data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback()); } public static TbMsg transformMsgCustomerId(TbMsg tbMsg, CustomerId customerId) { - return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType, + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType, tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback()); } public static TbMsg transformMsgRuleChainId(TbMsg tbMsg, RuleChainId ruleChainId) { - return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, tbMsg.data, ruleChainId, null, tbMsg.ctx.copy(), tbMsg.getCallback()); } public static TbMsg transformMsgQueueName(TbMsg tbMsg, String queueName) { - return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, + return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, tbMsg.data, tbMsg.getRuleChainId(), null, tbMsg.ctx.copy(), tbMsg.getCallback()); } public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId, String queueName) { - return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, + return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, tbMsg.data, ruleChainId, null, tbMsg.ctx.copy(), tbMsg.getCallback()); } //used for enqueueForTellNext public static TbMsg newMsg(TbMsg tbMsg, String queueName, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(queueName, UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(), + return new TbMsg(queueName, UUID.randomUUID(), tbMsg.getTs(), tbMsg.getInternalType(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(), tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ctx.copy(), TbMsgCallback.EMPTY); } private TbMsg(String queueName, UUID id, long ts, TbMsgType internalType, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgProcessingCtx ctx, TbMsgCallback callback) { - this.id = id; - this.queueName = queueName; - if (ts > 0) { - this.ts = ts; - } else { - this.ts = System.currentTimeMillis(); - } - this.internalType = internalType; - this.type = internalType.name(); - this.originator = originator; - if (customerId == null || customerId.isNullUid()) { - if (originator != null && originator.getEntityType() == EntityType.CUSTOMER) { - this.customerId = (CustomerId) originator; - } else { - this.customerId = null; - } - } else { - this.customerId = customerId; - } - this.metaData = metaData; - this.dataType = dataType; - this.data = data; - this.ruleChainId = ruleChainId; - this.ruleNodeId = ruleNodeId; - this.ctx = ctx != null ? ctx : new TbMsgProcessingCtx(); - this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY); + this(queueName, id, ts, internalType, internalType.name(), originator, customerId, metaData, dataType, data, ruleChainId, ruleNodeId, ctx, callback); } - private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data, + private TbMsg(String queueName, UUID id, long ts, TbMsgType internalType, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgProcessingCtx ctx, TbMsgCallback callback) { this.id = id; this.queueName = queueName; @@ -356,7 +331,7 @@ public final class TbMsg implements Serializable { this.ts = System.currentTimeMillis(); } this.type = type; - this.internalType = getInternalType(); + this.internalType = internalType != null ? internalType : getInternalType(type); this.originator = originator; if (customerId == null || customerId.isNullUid()) { if (originator != null && originator.getEntityType() == EntityType.CUSTOMER) { @@ -442,7 +417,7 @@ public final class TbMsg implements Serializable { } TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; - return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, customerId, + return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, ctx, callback); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Could not parse protobuf for TbMsg", e); @@ -454,17 +429,17 @@ public final class TbMsg implements Serializable { } public TbMsg copyWithRuleChainId(RuleChainId ruleChainId, UUID msgId) { - return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId, + return new TbMsg(this.queueName, msgId, this.ts, this.internalType, this.type, this.originator, this.customerId, this.metaData, this.dataType, this.data, ruleChainId, null, this.ctx, callback); } public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID msgId) { - return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId, + return new TbMsg(this.queueName, msgId, this.ts, this.internalType, this.type, this.originator, this.customerId, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx, callback); } public TbMsg copyWithNewCtx() { - return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.customerId, + return new TbMsg(this.queueName, this.id, this.ts, this.internalType, this.type, this.originator, this.customerId, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx.copy(), TbMsgCallback.EMPTY); } @@ -500,10 +475,7 @@ public final class TbMsg implements Serializable { return ts; } - public TbMsgType getInternalType() { - if (internalType != null) { - return internalType; - } + private TbMsgType getInternalType(String type) { try { return TbMsgType.valueOf(type); } catch (IllegalArgumentException e) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java index 068d342ea7..d5b06b4537 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java @@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; @@ -50,7 +49,7 @@ public class TbMsgTypeSwitchNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - ctx.tellNext(msg, TbMsgType.getRuleNodeConnectionOrElseOther(msg.getInternalType())); + ctx.tellNext(msg, msg.getInternalType().getRuleNodeConnection()); } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNodeTest.java index 7861b2f489..603c23cd05 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNodeTest.java @@ -84,7 +84,7 @@ class TbMsgTypeSwitchNodeTest { assertThat(msg.getType()).isEqualTo(msg.getInternalType().name()); assertThat(msg).isSameAs(tbMsgList.get(i)); assertThat(resultNodeConnections.get(i)) - .isEqualTo(TbMsgType.getRuleNodeConnectionOrElseOther(msg.getInternalType())); + .isEqualTo(msg.getInternalType().getRuleNodeConnection()); } }