added internal type to TbMsg to replace if-return blocks with switch-case
This commit is contained in:
parent
0817108fac
commit
3d5cfa0c2e
@ -76,7 +76,10 @@ public enum TbMsgType {
|
|||||||
DEVICE_UPDATE_SELF_MSG(null, true),
|
DEVICE_UPDATE_SELF_MSG(null, true),
|
||||||
DEDUPLICATION_TIMEOUT_SELF_MSG(null, true),
|
DEDUPLICATION_TIMEOUT_SELF_MSG(null, true),
|
||||||
DELAY_TIMEOUT_SELF_MSG(null, true),
|
DELAY_TIMEOUT_SELF_MSG(null, true),
|
||||||
MSG_COUNT_SELF_MSG(null, true);
|
MSG_COUNT_SELF_MSG(null, true),
|
||||||
|
|
||||||
|
// Custom or N/A type:
|
||||||
|
CUSTOM_OR_NA_TYPE(null, false, true);
|
||||||
|
|
||||||
public static final List<String> NODE_CONNECTIONS = EnumSet.allOf(TbMsgType.class).stream()
|
public static final List<String> NODE_CONNECTIONS = EnumSet.allOf(TbMsgType.class).stream()
|
||||||
.filter(tbMsgType -> !tbMsgType.isTellSelfOnly())
|
.filter(tbMsgType -> !tbMsgType.isTellSelfOnly())
|
||||||
@ -90,26 +93,32 @@ public enum TbMsgType {
|
|||||||
@Getter
|
@Getter
|
||||||
private final boolean tellSelfOnly;
|
private final boolean tellSelfOnly;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
private final boolean customType;
|
||||||
|
|
||||||
|
TbMsgType(String ruleNodeConnection, boolean tellSelfOnly, boolean customType) {
|
||||||
|
this.ruleNodeConnection = ruleNodeConnection;
|
||||||
|
this.tellSelfOnly = tellSelfOnly;
|
||||||
|
this.customType = customType;
|
||||||
|
}
|
||||||
|
|
||||||
TbMsgType(String ruleNodeConnection, boolean tellSelfOnly) {
|
TbMsgType(String ruleNodeConnection, boolean tellSelfOnly) {
|
||||||
this.ruleNodeConnection = ruleNodeConnection;
|
this.ruleNodeConnection = ruleNodeConnection;
|
||||||
this.tellSelfOnly = tellSelfOnly;
|
this.tellSelfOnly = tellSelfOnly;
|
||||||
|
this.customType = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
TbMsgType(String ruleNodeConnection) {
|
TbMsgType(String ruleNodeConnection) {
|
||||||
this.ruleNodeConnection = ruleNodeConnection;
|
this.ruleNodeConnection = ruleNodeConnection;
|
||||||
this.tellSelfOnly = false;
|
this.tellSelfOnly = false;
|
||||||
|
this.customType = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getRuleNodeConnectionOrElseOther(String msgType) {
|
public static String getRuleNodeConnectionOrElseOther(TbMsgType msgType) {
|
||||||
if (msgType == null) {
|
if (msgType == null || msgType.isCustomType() || msgType.isTellSelfOnly()) {
|
||||||
return TbNodeConnectionType.OTHER;
|
return TbNodeConnectionType.OTHER;
|
||||||
} else {
|
|
||||||
return Arrays.stream(TbMsgType.values())
|
|
||||||
.filter(type -> type.name().equals(msgType))
|
|
||||||
.findFirst()
|
|
||||||
.map(TbMsgType::getRuleNodeConnection)
|
|
||||||
.orElse(TbNodeConnectionType.OTHER);
|
|
||||||
}
|
}
|
||||||
|
return Objects.requireNonNullElse(msgType.getRuleNodeConnection(), TbNodeConnectionType.OTHER);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,6 +22,7 @@ import java.util.List;
|
|||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM;
|
import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM;
|
||||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM_DELETE;
|
import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM_DELETE;
|
||||||
|
import static org.thingsboard.server.common.data.msg.TbMsgType.CUSTOM_OR_NA_TYPE;
|
||||||
import static org.thingsboard.server.common.data.msg.TbMsgType.DEDUPLICATION_TIMEOUT_SELF_MSG;
|
import static org.thingsboard.server.common.data.msg.TbMsgType.DEDUPLICATION_TIMEOUT_SELF_MSG;
|
||||||
import static org.thingsboard.server.common.data.msg.TbMsgType.DELAY_TIMEOUT_SELF_MSG;
|
import static org.thingsboard.server.common.data.msg.TbMsgType.DELAY_TIMEOUT_SELF_MSG;
|
||||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_ASSIGNED_TO_EDGE;
|
import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_ASSIGNED_TO_EDGE;
|
||||||
@ -51,11 +52,12 @@ class TbMsgTypeTest {
|
|||||||
DEVICE_UPDATE_SELF_MSG,
|
DEVICE_UPDATE_SELF_MSG,
|
||||||
DEDUPLICATION_TIMEOUT_SELF_MSG,
|
DEDUPLICATION_TIMEOUT_SELF_MSG,
|
||||||
DELAY_TIMEOUT_SELF_MSG,
|
DELAY_TIMEOUT_SELF_MSG,
|
||||||
MSG_COUNT_SELF_MSG
|
MSG_COUNT_SELF_MSG,
|
||||||
|
CUSTOM_OR_NA_TYPE
|
||||||
);
|
);
|
||||||
|
|
||||||
// backward-compatibility tests
|
// backward-compatibility tests
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void getRuleNodeConnectionsTest() {
|
void getRuleNodeConnectionsTest() {
|
||||||
var tbMsgTypes = TbMsgType.values();
|
var tbMsgTypes = TbMsgType.values();
|
||||||
@ -75,13 +77,25 @@ class TbMsgTypeTest {
|
|||||||
var tbMsgTypes = TbMsgType.values();
|
var tbMsgTypes = TbMsgType.values();
|
||||||
for (var type : tbMsgTypes) {
|
for (var type : tbMsgTypes) {
|
||||||
if (typesWithNullRuleNodeConnection.contains(type)) {
|
if (typesWithNullRuleNodeConnection.contains(type)) {
|
||||||
assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(type.name()))
|
assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(type))
|
||||||
.isEqualTo(TbNodeConnectionType.OTHER);
|
.isEqualTo(TbNodeConnectionType.OTHER);
|
||||||
} else {
|
} else {
|
||||||
assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(type.name())).isNotNull()
|
assertThat(TbMsgType.getRuleNodeConnectionOrElseOther(type)).isNotNull()
|
||||||
.isNotEqualTo(TbNodeConnectionType.OTHER);
|
.isNotEqualTo(TbNodeConnectionType.OTHER);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void getCustomTypeTest() {
|
||||||
|
var tbMsgTypes = TbMsgType.values();
|
||||||
|
for (var type : tbMsgTypes) {
|
||||||
|
if (type.equals(CUSTOM_OR_NA_TYPE)) {
|
||||||
|
assertThat(type.isCustomType()).isTrue();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
assertThat(type.isCustomType()).isFalse();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -52,6 +52,7 @@ public final class TbMsg implements Serializable {
|
|||||||
private final UUID id;
|
private final UUID id;
|
||||||
private final long ts;
|
private final long ts;
|
||||||
private final String type;
|
private final String type;
|
||||||
|
private final TbMsgType internalType;
|
||||||
private final EntityId originator;
|
private final EntityId originator;
|
||||||
private final CustomerId customerId;
|
private final CustomerId customerId;
|
||||||
private final TbMsgMetaData metaData;
|
private final TbMsgMetaData metaData;
|
||||||
@ -117,7 +118,7 @@ public final class TbMsg implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static TbMsg newMsg(String queueName, TbMsgType type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
public static TbMsg newMsg(String queueName, TbMsgType type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||||
return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type.name(), originator, customerId,
|
return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
|
||||||
metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY);
|
metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -126,7 +127,7 @@ public final class TbMsg implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static TbMsg newMsg(TbMsgType type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
|
public static TbMsg newMsg(TbMsgType type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
|
||||||
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type.name(), originator, customerId,
|
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
|
||||||
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY);
|
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -205,12 +206,12 @@ public final class TbMsg implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static TbMsg newMsg(String queueName, TbMsgType type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
|
public static TbMsg newMsg(String queueName, TbMsgType type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
|
||||||
return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type.name(), originator, customerId,
|
return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
|
||||||
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY);
|
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TbMsg newMsg(TbMsgType type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data) {
|
public static TbMsg newMsg(TbMsgType type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data) {
|
||||||
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type.name(), originator, customerId,
|
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
|
||||||
metaData.copy(), dataType, data, null, null, null, TbMsgCallback.EMPTY);
|
metaData.copy(), dataType, data, null, null, null, TbMsgCallback.EMPTY);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -255,17 +256,17 @@ public final class TbMsg implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static TbMsg newMsg(TbMsgType type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
public static TbMsg newMsg(TbMsgType type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||||
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type.name(), originator, null,
|
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null,
|
||||||
metaData.copy(), dataType, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY);
|
metaData.copy(), dataType, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TbMsg newMsg(TbMsgType type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) {
|
public static TbMsg newMsg(TbMsgType type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) {
|
||||||
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type.name(), originator, null,
|
return new TbMsg(null, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null,
|
||||||
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, callback);
|
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TbMsg transformMsg(TbMsg tbMsg, TbMsgType type, EntityId originator, TbMsgMetaData metaData, String data) {
|
public static TbMsg transformMsg(TbMsg tbMsg, TbMsgType type, EntityId originator, TbMsgMetaData metaData, String data) {
|
||||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, type.name(), originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType,
|
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType,
|
||||||
data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.callback);
|
data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -315,6 +316,36 @@ public final class TbMsg implements Serializable {
|
|||||||
tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ctx.copy(), TbMsgCallback.EMPTY);
|
tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ctx.copy(), TbMsgCallback.EMPTY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TbMsg(String queueName, UUID id, long ts, TbMsgType internalType, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
|
||||||
|
RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgProcessingCtx ctx, TbMsgCallback callback) {
|
||||||
|
this.id = id;
|
||||||
|
this.queueName = queueName;
|
||||||
|
if (ts > 0) {
|
||||||
|
this.ts = ts;
|
||||||
|
} else {
|
||||||
|
this.ts = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
this.internalType = internalType;
|
||||||
|
this.type = internalType.name();
|
||||||
|
this.originator = originator;
|
||||||
|
if (customerId == null || customerId.isNullUid()) {
|
||||||
|
if (originator != null && originator.getEntityType() == EntityType.CUSTOMER) {
|
||||||
|
this.customerId = (CustomerId) originator;
|
||||||
|
} else {
|
||||||
|
this.customerId = null;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.customerId = customerId;
|
||||||
|
}
|
||||||
|
this.metaData = metaData;
|
||||||
|
this.dataType = dataType;
|
||||||
|
this.data = data;
|
||||||
|
this.ruleChainId = ruleChainId;
|
||||||
|
this.ruleNodeId = ruleNodeId;
|
||||||
|
this.ctx = ctx != null ? ctx : new TbMsgProcessingCtx();
|
||||||
|
this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY);
|
||||||
|
}
|
||||||
|
|
||||||
private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
|
private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
|
||||||
RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgProcessingCtx ctx, TbMsgCallback callback) {
|
RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgProcessingCtx ctx, TbMsgCallback callback) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
@ -325,6 +356,7 @@ public final class TbMsg implements Serializable {
|
|||||||
this.ts = System.currentTimeMillis();
|
this.ts = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
this.type = type;
|
this.type = type;
|
||||||
|
this.internalType = getInternalType();
|
||||||
this.originator = originator;
|
this.originator = originator;
|
||||||
if (customerId == null || customerId.isNullUid()) {
|
if (customerId == null || customerId.isNullUid()) {
|
||||||
if (originator != null && originator.getEntityType() == EntityType.CUSTOMER) {
|
if (originator != null && originator.getEntityType() == EntityType.CUSTOMER) {
|
||||||
@ -468,8 +500,19 @@ public final class TbMsg implements Serializable {
|
|||||||
return ts;
|
return ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TbMsgType getInternalType() {
|
||||||
|
if (internalType != null) {
|
||||||
|
return internalType;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return TbMsgType.valueOf(type);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
return TbMsgType.CUSTOM_OR_NA_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isTypeOf(TbMsgType tbMsgType) {
|
public boolean isTypeOf(TbMsgType tbMsgType) {
|
||||||
return tbMsgType != null && tbMsgType.name().equals(this.type);
|
return tbMsgType != null && tbMsgType.equals(getInternalType());
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isTypeOneOf(TbMsgType... types) {
|
public boolean isTypeOneOf(TbMsgType... types) {
|
||||||
|
|||||||
@ -50,7 +50,7 @@ public class TbMsgTypeSwitchNode implements TbNode {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||||
ctx.tellNext(msg, TbMsgType.getRuleNodeConnectionOrElseOther(msg.getType()));
|
ctx.tellNext(msg, TbMsgType.getRuleNodeConnectionOrElseOther(msg.getInternalType()));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -81,9 +81,10 @@ class TbMsgTypeSwitchNodeTest {
|
|||||||
var msg = resultMsgs.get(i);
|
var msg = resultMsgs.get(i);
|
||||||
assertThat(msg).isNotNull();
|
assertThat(msg).isNotNull();
|
||||||
assertThat(msg.getType()).isNotNull();
|
assertThat(msg.getType()).isNotNull();
|
||||||
|
assertThat(msg.getType()).isEqualTo(msg.getInternalType().name());
|
||||||
assertThat(msg).isSameAs(tbMsgList.get(i));
|
assertThat(msg).isSameAs(tbMsgList.get(i));
|
||||||
assertThat(resultNodeConnections.get(i))
|
assertThat(resultNodeConnections.get(i))
|
||||||
.isEqualTo(TbMsgType.getRuleNodeConnectionOrElseOther(msg.getType()));
|
.isEqualTo(TbMsgType.getRuleNodeConnectionOrElseOther(msg.getInternalType()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user