refactoring after review
This commit is contained in:
parent
3d5cfa0c2e
commit
cebe1040d4
@ -16,11 +16,10 @@
|
||||
package org.thingsboard.server.common.data.msg;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public enum TbMsgType {
|
||||
@ -79,12 +78,12 @@ public enum TbMsgType {
|
||||
MSG_COUNT_SELF_MSG(null, true),
|
||||
|
||||
// Custom or N/A type:
|
||||
CUSTOM_OR_NA_TYPE(null, false, true);
|
||||
CUSTOM_OR_NA_TYPE(null, false);
|
||||
|
||||
public static final List<String> NODE_CONNECTIONS = EnumSet.allOf(TbMsgType.class).stream()
|
||||
.filter(tbMsgType -> !tbMsgType.isTellSelfOnly())
|
||||
.map(TbMsgType::getRuleNodeConnection)
|
||||
.filter(Objects::nonNull)
|
||||
.filter(connection -> !TbNodeConnectionType.OTHER.equals(connection))
|
||||
.collect(Collectors.toUnmodifiableList());
|
||||
|
||||
@Getter
|
||||
@ -93,32 +92,13 @@ public enum TbMsgType {
|
||||
@Getter
|
||||
private final boolean tellSelfOnly;
|
||||
|
||||
@Getter
|
||||
private final boolean customType;
|
||||
|
||||
TbMsgType(String ruleNodeConnection, boolean tellSelfOnly, boolean customType) {
|
||||
this.ruleNodeConnection = ruleNodeConnection;
|
||||
this.tellSelfOnly = tellSelfOnly;
|
||||
this.customType = customType;
|
||||
}
|
||||
|
||||
TbMsgType(String ruleNodeConnection, boolean tellSelfOnly) {
|
||||
this.ruleNodeConnection = ruleNodeConnection;
|
||||
this.ruleNodeConnection = StringUtils.isNotEmpty(ruleNodeConnection) ? ruleNodeConnection : TbNodeConnectionType.OTHER;
|
||||
this.tellSelfOnly = tellSelfOnly;
|
||||
this.customType = false;
|
||||
}
|
||||
|
||||
TbMsgType(String ruleNodeConnection) {
|
||||
this.ruleNodeConnection = ruleNodeConnection;
|
||||
this.tellSelfOnly = false;
|
||||
this.customType = false;
|
||||
}
|
||||
|
||||
public static String getRuleNodeConnectionOrElseOther(TbMsgType msgType) {
|
||||
if (msgType == null || msgType.isCustomType() || msgType.isTellSelfOnly()) {
|
||||
return TbNodeConnectionType.OTHER;
|
||||
}
|
||||
return Objects.requireNonNullElse(msgType.getRuleNodeConnection(), TbNodeConnectionType.OTHER);
|
||||
this(ruleNodeConnection, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -63,39 +63,25 @@ class TbMsgTypeTest {
|
||||
var tbMsgTypes = TbMsgType.values();
|
||||
for (var type : tbMsgTypes) {
|
||||
if (typesWithNullRuleNodeConnection.contains(type)) {
|
||||
assertThat(type.getRuleNodeConnection()).isNull();
|
||||
assertThat(type.getRuleNodeConnection()).isEqualTo(TbNodeConnectionType.OTHER);
|
||||
} else {
|
||||
assertThat(type.getRuleNodeConnection()).isNotNull();
|
||||
assertThat(type.getRuleNodeConnection()).isNotEqualTo(TbNodeConnectionType.OTHER);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void getRuleNodeConnectionOrElseOtherTest() {
|
||||
assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(null))
|
||||
.isEqualTo(TbNodeConnectionType.OTHER);
|
||||
var tbMsgTypes = TbMsgType.values();
|
||||
for (var type : tbMsgTypes) {
|
||||
if (typesWithNullRuleNodeConnection.contains(type)) {
|
||||
assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(type))
|
||||
assertThat(type.getRuleNodeConnection())
|
||||
.isEqualTo(TbNodeConnectionType.OTHER);
|
||||
} else {
|
||||
assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(type)).isNotNull()
|
||||
assertThat(type.getRuleNodeConnection()).isNotNull()
|
||||
.isNotEqualTo(TbNodeConnectionType.OTHER);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void getCustomTypeTest() {
|
||||
var tbMsgTypes = TbMsgType.values();
|
||||
for (var type : tbMsgTypes) {
|
||||
if (type.equals(CUSTOM_OR_NA_TYPE)) {
|
||||
assertThat(type.isCustomType()).isTrue();
|
||||
continue;
|
||||
}
|
||||
assertThat(type.isCustomType()).isFalse();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -98,7 +98,7 @@ public final class TbMsg implements Serializable {
|
||||
*/
|
||||
@Deprecated(since = "3.5.2")
|
||||
public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
|
||||
return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), null, type, originator, customerId,
|
||||
metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY);
|
||||
}
|
||||
|
||||
@ -109,7 +109,7 @@ public final class TbMsg implements Serializable {
|
||||
|
||||
@Deprecated(since = "3.5.2", forRemoval = true)
|
||||
public static TbMsg newMsg(String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
|
||||
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
|
||||
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), null, type, originator, customerId,
|
||||
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY);
|
||||
}
|
||||
|
||||
@ -171,13 +171,13 @@ public final class TbMsg implements Serializable {
|
||||
*/
|
||||
@Deprecated(since = "3.5.2")
|
||||
public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
|
||||
return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
|
||||
return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), null, type, originator, customerId,
|
||||
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY);
|
||||
}
|
||||
|
||||
@Deprecated(since = "3.5.2", forRemoval = true)
|
||||
public static TbMsg newMsg(String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data) {
|
||||
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
|
||||
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), null, type, originator, customerId,
|
||||
metaData.copy(), dataType, data, null, null, null, TbMsgCallback.EMPTY);
|
||||
}
|
||||
|
||||
@ -223,13 +223,13 @@ public final class TbMsg implements Serializable {
|
||||
|
||||
@Deprecated(since = "3.5.2", forRemoval = true)
|
||||
public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null,
|
||||
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), null, type, originator, null,
|
||||
metaData.copy(), dataType, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY);
|
||||
}
|
||||
|
||||
@Deprecated(since = "3.5.2", forRemoval = true)
|
||||
public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) {
|
||||
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null,
|
||||
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), null, type, originator, null,
|
||||
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, callback);
|
||||
}
|
||||
|
||||
@ -251,7 +251,7 @@ public final class TbMsg implements Serializable {
|
||||
*/
|
||||
@Deprecated(since = "3.5.2")
|
||||
public static TbMsg transformMsg(TbMsg tbMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType,
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, null, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType,
|
||||
data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.callback);
|
||||
}
|
||||
|
||||
@ -271,82 +271,57 @@ public final class TbMsg implements Serializable {
|
||||
}
|
||||
|
||||
public static TbMsg transformMsgOriginator(TbMsg tbMsg, EntityId originatorId) {
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, originatorId, tbMsg.getCustomerId(), tbMsg.metaData, tbMsg.dataType,
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, originatorId, tbMsg.getCustomerId(), tbMsg.metaData, tbMsg.dataType,
|
||||
tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback());
|
||||
}
|
||||
|
||||
public static TbMsg transformMsgData(TbMsg tbMsg, String data) {
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback());
|
||||
}
|
||||
|
||||
public static TbMsg transformMsgMetadata(TbMsg tbMsg, TbMsgMetaData metadata) {
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, metadata.copy(), tbMsg.dataType,
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, tbMsg.customerId, metadata.copy(), tbMsg.dataType,
|
||||
tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback());
|
||||
}
|
||||
|
||||
public static TbMsg transformMsg(TbMsg tbMsg, TbMsgMetaData metadata, String data) {
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, metadata, tbMsg.dataType,
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, tbMsg.customerId, metadata, tbMsg.dataType,
|
||||
data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback());
|
||||
}
|
||||
|
||||
public static TbMsg transformMsgCustomerId(TbMsg tbMsg, CustomerId customerId) {
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback());
|
||||
}
|
||||
|
||||
public static TbMsg transformMsgRuleChainId(TbMsg tbMsg, RuleChainId ruleChainId) {
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
tbMsg.data, ruleChainId, null, tbMsg.ctx.copy(), tbMsg.getCallback());
|
||||
}
|
||||
|
||||
public static TbMsg transformMsgQueueName(TbMsg tbMsg, String queueName) {
|
||||
return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
tbMsg.data, tbMsg.getRuleChainId(), null, tbMsg.ctx.copy(), tbMsg.getCallback());
|
||||
}
|
||||
|
||||
public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId, String queueName) {
|
||||
return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.internalType, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
tbMsg.data, ruleChainId, null, tbMsg.ctx.copy(), tbMsg.getCallback());
|
||||
}
|
||||
|
||||
//used for enqueueForTellNext
|
||||
public static TbMsg newMsg(TbMsg tbMsg, String queueName, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
return new TbMsg(queueName, UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(),
|
||||
return new TbMsg(queueName, UUID.randomUUID(), tbMsg.getTs(), tbMsg.getInternalType(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(),
|
||||
tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ctx.copy(), TbMsgCallback.EMPTY);
|
||||
}
|
||||
|
||||
private TbMsg(String queueName, UUID id, long ts, TbMsgType internalType, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
|
||||
RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgProcessingCtx ctx, TbMsgCallback callback) {
|
||||
this.id = id;
|
||||
this.queueName = queueName;
|
||||
if (ts > 0) {
|
||||
this.ts = ts;
|
||||
} else {
|
||||
this.ts = System.currentTimeMillis();
|
||||
}
|
||||
this.internalType = internalType;
|
||||
this.type = internalType.name();
|
||||
this.originator = originator;
|
||||
if (customerId == null || customerId.isNullUid()) {
|
||||
if (originator != null && originator.getEntityType() == EntityType.CUSTOMER) {
|
||||
this.customerId = (CustomerId) originator;
|
||||
} else {
|
||||
this.customerId = null;
|
||||
}
|
||||
} else {
|
||||
this.customerId = customerId;
|
||||
}
|
||||
this.metaData = metaData;
|
||||
this.dataType = dataType;
|
||||
this.data = data;
|
||||
this.ruleChainId = ruleChainId;
|
||||
this.ruleNodeId = ruleNodeId;
|
||||
this.ctx = ctx != null ? ctx : new TbMsgProcessingCtx();
|
||||
this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY);
|
||||
this(queueName, id, ts, internalType, internalType.name(), originator, customerId, metaData, dataType, data, ruleChainId, ruleNodeId, ctx, callback);
|
||||
}
|
||||
|
||||
private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
|
||||
private TbMsg(String queueName, UUID id, long ts, TbMsgType internalType, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
|
||||
RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgProcessingCtx ctx, TbMsgCallback callback) {
|
||||
this.id = id;
|
||||
this.queueName = queueName;
|
||||
@ -356,7 +331,7 @@ public final class TbMsg implements Serializable {
|
||||
this.ts = System.currentTimeMillis();
|
||||
}
|
||||
this.type = type;
|
||||
this.internalType = getInternalType();
|
||||
this.internalType = internalType != null ? internalType : getInternalType(type);
|
||||
this.originator = originator;
|
||||
if (customerId == null || customerId.isNullUid()) {
|
||||
if (originator != null && originator.getEntityType() == EntityType.CUSTOMER) {
|
||||
@ -442,7 +417,7 @@ public final class TbMsg implements Serializable {
|
||||
}
|
||||
|
||||
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
|
||||
return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, customerId,
|
||||
return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId,
|
||||
metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, ctx, callback);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
|
||||
@ -454,17 +429,17 @@ public final class TbMsg implements Serializable {
|
||||
}
|
||||
|
||||
public TbMsg copyWithRuleChainId(RuleChainId ruleChainId, UUID msgId) {
|
||||
return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId,
|
||||
return new TbMsg(this.queueName, msgId, this.ts, this.internalType, this.type, this.originator, this.customerId,
|
||||
this.metaData, this.dataType, this.data, ruleChainId, null, this.ctx, callback);
|
||||
}
|
||||
|
||||
public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID msgId) {
|
||||
return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId,
|
||||
return new TbMsg(this.queueName, msgId, this.ts, this.internalType, this.type, this.originator, this.customerId,
|
||||
this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx, callback);
|
||||
}
|
||||
|
||||
public TbMsg copyWithNewCtx() {
|
||||
return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.customerId,
|
||||
return new TbMsg(this.queueName, this.id, this.ts, this.internalType, this.type, this.originator, this.customerId,
|
||||
this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx.copy(), TbMsgCallback.EMPTY);
|
||||
}
|
||||
|
||||
@ -500,10 +475,7 @@ public final class TbMsg implements Serializable {
|
||||
return ts;
|
||||
}
|
||||
|
||||
public TbMsgType getInternalType() {
|
||||
if (internalType != null) {
|
||||
return internalType;
|
||||
}
|
||||
private TbMsgType getInternalType(String type) {
|
||||
try {
|
||||
return TbMsgType.valueOf(type);
|
||||
} catch (IllegalArgumentException e) {
|
||||
|
||||
@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.RuleNode;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
@ -50,7 +49,7 @@ public class TbMsgTypeSwitchNode implements TbNode {
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
ctx.tellNext(msg, TbMsgType.getRuleNodeConnectionOrElseOther(msg.getInternalType()));
|
||||
ctx.tellNext(msg, msg.getInternalType().getRuleNodeConnection());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -84,7 +84,7 @@ class TbMsgTypeSwitchNodeTest {
|
||||
assertThat(msg.getType()).isEqualTo(msg.getInternalType().name());
|
||||
assertThat(msg).isSameAs(tbMsgList.get(i));
|
||||
assertThat(resultNodeConnections.get(i))
|
||||
.isEqualTo(TbMsgType.getRuleNodeConnectionOrElseOther(msg.getInternalType()));
|
||||
.isEqualTo(msg.getInternalType().getRuleNodeConnection());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user