diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 3fb28aee38..c07134105c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -235,7 +235,8 @@ public class DefaultTbContext implements TbContext { TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + .setTbMsgProto(TbMsg.toProto(tbMsg)) + .build(); mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback); } @@ -314,7 +315,7 @@ public class DefaultTbContext implements TbContext { TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)) + .setTbMsgProto(TbMsg.toProto(tbMsg)) .addAllRelationTypes(relationTypes); if (failureMessage != null) { msg.setFailureMessage(failureMessage); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 5c3f2b7052..d93733f562 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -381,7 +381,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor(tbMsg.getId(), msg), callback); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java index 00a9a8e2ce..cec2aa19da 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.msg.gen.MsgProtos; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos; import java.util.UUID; @@ -34,7 +35,7 @@ public class SequentialByOriginatorIdTbRuleEngineSubmitStrategy extends Sequenti @Override protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) { try { - MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(msg.getTbMsg()); + MsgProtos.TbMsgProto proto = ProtoUtils.getTbMsgProto(msg); return EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); } catch (InvalidProtocolBufferException e) { log.warn("[{}] Failed to parse TbMsg: {}", queueName, msg); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java index 0242c676a1..023b4d4cc7 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -19,8 +19,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.thingsboard.server.common.data.queue.ProcessingStrategy; -import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.queue.TbMsgCallback; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -125,7 +125,7 @@ public class TbRuleEngineProcessingStrategyFactory { } log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); if (log.isTraceEnabled()) { - toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, ProtoUtils.fromTbMsgProto(result.getQueueName(), msg.getValue(), TbMsgCallback.EMPTY))); } if (pauseBetweenRetries > 0) { try { @@ -164,10 +164,10 @@ public class TbRuleEngineProcessingStrategyFactory { log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); } if (log.isTraceEnabled()) { - result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, ProtoUtils.fromTbMsgProto(result.getQueueName(), msg.getValue(), TbMsgCallback.EMPTY))); } if (log.isTraceEnabled()) { - result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, ProtoUtils.fromTbMsgProto(result.getQueueName(), msg.getValue(), TbMsgCallback.EMPTY))); } return new TbRuleEngineProcessingDecision(true, null); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 879da5afc6..3aa0d2eabd 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -30,6 +30,7 @@ import org.thingsboard.server.common.msg.queue.RuleNodeInfo; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbMsgCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -178,7 +179,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager relationTypes; @@ -207,7 +208,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager> pending : map.entrySet()) { ToRuleEngineMsg tmp = pending.getValue().getValue(); - TbMsg tmpMsg = TbMsg.fromBytes(config.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY); + TbMsg tmpMsg = ProtoUtils.fromTbMsgProto(config.getName(), tmp, TbMsgCallback.EMPTY); RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey()); if (printAll) { log.trace("[{}][{}] {} to process message: {}, Last Rule Node: {}", queueKey, TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo); @@ -236,7 +237,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager msg : msgs) { try { - MsgProtos.TbMsgProto tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg().toByteArray()); + MsgProtos.TbMsgProto tbMsgProto = ProtoUtils.getTbMsgProto(msg.getValue()); EntityId originator = EntityIdFactory.getByTypeAndUuid(tbMsgProto.getEntityType(), new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB())); TopicPartitionInfo tpi = ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, config.getName(), TenantId.SYS_TENANT_ID, originator); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 463624843a..1d5cbb6ebc 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -133,7 +133,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi TransportProtos.RestApiCallResponseMsgProto msg = TransportProtos.RestApiCallResponseMsgProto.newBuilder() .setRequestIdMSB(requestId.getMostSignificantBits()) .setRequestIdLSB(requestId.getLeastSignificantBits()) - .setResponse(TbMsg.toByteString(tbMsg)) + .setResponseProto(TbMsg.toProto(tbMsg)) .build(); clusterService.pushNotificationToCore(serviceId, msg, null); } diff --git a/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java b/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java index 7cfa55b857..4496bf0ef2 100644 --- a/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java +++ b/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java @@ -73,7 +73,7 @@ public class DefaultRuleEngineCallService implements RuleEngineCallService { UUID requestId = new UUID(restApiCallResponseMsg.getRequestIdMSB(), restApiCallResponseMsg.getRequestIdLSB()); Consumer consumer = requests.remove(requestId); if (consumer != null) { - consumer.accept(TbMsg.fromBytes(null, restApiCallResponseMsg.getResponse().toByteArray(), TbMsgCallback.EMPTY)); + consumer.accept(TbMsg.fromProto(null, restApiCallResponseMsg.getResponseProto(), restApiCallResponseMsg.getResponse(), TbMsgCallback.EMPTY)); } else { log.trace("[{}] Unknown or stale rest api call response received", requestId); } diff --git a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java index 54205327ca..bf055ee146 100644 --- a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java +++ b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java @@ -770,11 +770,11 @@ class DefaultTbContextTest { ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); assertThat(actualToRuleEngineMsg).usingRecursiveComparison() - .ignoringFields("tbMsg_") + .ignoringFields("tbMsgProto_") .isEqualTo(ToRuleEngineMsg.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(expectedTbMsg)) + .setTbMsgProto(TbMsg.toProto(expectedTbMsg)) .addAllRelationTypes(List.of(connectionType)).build()); var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue(); @@ -827,11 +827,11 @@ class DefaultTbContextTest { ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); assertThat(actualToRuleEngineMsg).usingRecursiveComparison() - .ignoringFields("tbMsg_") + .ignoringFields("tbMsgProto_") .isEqualTo(ToRuleEngineMsg.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(expectedTbMsg)) + .setTbMsgProto(TbMsg.toProto(expectedTbMsg)) .build()); var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue(); @@ -1009,11 +1009,11 @@ class DefaultTbContextTest { ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); assertThat(actualToRuleEngineMsg).usingRecursiveComparison() - .ignoringFields("tbMsg_") + .ignoringFields("tbMsgProto_.id_") .isEqualTo(ToRuleEngineMsg.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(expectedTbMsg)) + .setTbMsgProto(TbMsg.toProto(expectedTbMsg)) .setFailureMessage(EXCEPTION_MSG) .addAllRelationTypes(List.of(TbNodeConnectionType.FAILURE)).build()); diff --git a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java index 4fd613dc1d..ba45be8e28 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java @@ -762,7 +762,8 @@ public class TenantControllerTest extends AbstractControllerTest { TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + .setTbMsgProto(TbMsg.toProto(tbMsg)) + .build(); tbClusterService.pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, null); return tbMsg; } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index 314c76b058..dd9c4bebcf 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -737,7 +737,7 @@ public class TbRuleEngineQueueConsumerManagerTest { .setTenantIdMSB(tenantId.getMostSignificantBits()) .setTenantIdLSB(tenantId.getLeastSignificantBits()) .addRelationTypes("Success") - .setTbMsg(TbMsg.toByteString(tbMsg)) + .setTbMsgProto(TbMsg.toProto(tbMsg)) .build()); } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java index 110bba0eaf..8c019029a5 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java @@ -263,7 +263,7 @@ public class TbRuleEngineStrategyTest { .setTenantIdMSB(tenantId.getMostSignificantBits()) .setTenantIdLSB(tenantId.getLeastSignificantBits()) .addRelationTypes("Success") - .setTbMsg(TbMsg.toByteString(tbMsg)) + .setTbMsgProto(TbMsg.toProto(tbMsg)) .build()); } diff --git a/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java b/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java index fceef46b89..ad5160afda 100644 --- a/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java @@ -55,7 +55,7 @@ class DefaultTbRuleEngineRpcServiceTest { var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.newBuilder() .setRequestIdMSB(requestId.getMostSignificantBits()) .setRequestIdLSB(requestId.getLeastSignificantBits()) - .setResponse(TbMsg.toByteString(msg)) + .setResponseProto(TbMsg.toProto(msg)) .build(); // WHEN diff --git a/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java b/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java index eeec376ab4..60c71a3837 100644 --- a/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java @@ -144,7 +144,7 @@ public class DefaultRuleEngineCallServiceTest { private TransportProtos.RestApiCallResponseMsgProto getResponse(UUID requestId, TbMsg msg) { return TransportProtos.RestApiCallResponseMsgProto.newBuilder() - .setResponse(TbMsg.toByteString(msg)) + .setResponseProto(TbMsg.toProto(msg)) .setRequestIdMSB(requestId.getMostSignificantBits()) .setRequestIdLSB(requestId.getLeastSignificantBits()) .build(); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 4e0f583285..1d8d9497a9 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.msg.gen.MsgProtos; +import org.thingsboard.server.common.msg.gen.MsgProtos.TbMsgProto; import org.thingsboard.server.common.msg.queue.TbMsgCallback; import java.io.Serializable; @@ -151,12 +152,8 @@ public final class TbMsg implements Serializable { this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY); } - public static ByteString toByteString(TbMsg msg) { - return ByteString.copyFrom(toByteArray(msg)); - } - - public static byte[] toByteArray(TbMsg msg) { - MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder(); + public static TbMsgProto toProto(TbMsg msg) { + TbMsgProto.Builder builder = TbMsgProto.newBuilder(); builder.setId(msg.getId().toString()); builder.setTs(msg.getTs()); builder.setType(msg.getType()); @@ -205,56 +202,55 @@ public final class TbMsg implements Serializable { } builder.setCtx(msg.ctx.toProto()); - return builder.build().toByteArray(); + return builder.build(); } - public static TbMsg fromBytes(String queueName, byte[] data, TbMsgCallback callback) { + @Deprecated(forRemoval = true, since = "4.1") // to be removed in 4.2 + public static TbMsg fromProto(String queueName, TbMsgProto proto, ByteString data, TbMsgCallback callback) { try { - MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data); - TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); - EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); - CustomerId customerId = null; - RuleChainId ruleChainId = null; - RuleNodeId ruleNodeId = null; - UUID correlationId = null; - Integer partition = null; - List calculatedFieldIds = new CopyOnWriteArrayList<>(); - if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) { - customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB())); + if (!data.isEmpty()) { + proto = TbMsgProto.parseFrom(data); } - if (proto.getRuleChainIdMSB() != 0L && proto.getRuleChainIdLSB() != 0L) { - ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB())); - } - if (proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) { - ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB())); - } - if (proto.getCorrelationIdMSB() != 0L && proto.getCorrelationIdLSB() != 0L) { - correlationId = new UUID(proto.getCorrelationIdMSB(), proto.getCorrelationIdLSB()); - partition = proto.getPartition(); - } - - for (MsgProtos.CalculatedFieldIdProto cfIdProto : proto.getCalculatedFieldsList()) { - CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID( - cfIdProto.getCalculatedFieldIdMSB(), - cfIdProto.getCalculatedFieldIdLSB() - )); - calculatedFieldIds.add(calculatedFieldId); - } - - TbMsgProcessingCtx ctx; - if (proto.hasCtx()) { - ctx = TbMsgProcessingCtx.fromProto(proto.getCtx()); - } else { - // Backward compatibility with unprocessed messages fetched from queue after update. - ctx = new TbMsgProcessingCtx(proto.getRuleNodeExecCounter()); - } - - TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; - return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId, - metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Could not parse protobuf for TbMsg", e); } + return fromProto(queueName, proto, callback); + } + + public static TbMsg fromProto(String queueName, TbMsgProto proto, TbMsgCallback callback) { + TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); + EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); + CustomerId customerId = null; + RuleChainId ruleChainId = null; + RuleNodeId ruleNodeId = null; + UUID correlationId = null; + Integer partition = null; + List calculatedFieldIds = new CopyOnWriteArrayList<>(); + if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) { + customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB())); + } + if (proto.getRuleChainIdMSB() != 0L && proto.getRuleChainIdLSB() != 0L) { + ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB())); + } + if (proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) { + ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB())); + } + if (proto.getCorrelationIdMSB() != 0L && proto.getCorrelationIdLSB() != 0L) { + correlationId = new UUID(proto.getCorrelationIdMSB(), proto.getCorrelationIdLSB()); + partition = proto.getPartition(); + } + + for (MsgProtos.CalculatedFieldIdProto cfIdProto : proto.getCalculatedFieldsList()) { + CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID( + cfIdProto.getCalculatedFieldIdMSB(), + cfIdProto.getCalculatedFieldIdLSB() + )); + calculatedFieldIds.add(calculatedFieldId); + } + TbMsgProcessingCtx ctx = TbMsgProcessingCtx.fromProto(proto.getCtx()); + TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; + return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId, + metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback); } public int getAndIncrementRuleNodeCounter() { @@ -504,11 +500,11 @@ public final class TbMsg implements Serializable { public String toString() { return "TbMsg.TbMsgBuilder(queueName=" + this.queueName + ", id=" + this.id + ", ts=" + this.ts + - ", type=" + this.type + ", internalType=" + this.internalType + ", originator=" + this.originator + - ", customerId=" + this.customerId + ", metaData=" + this.metaData + ", dataType=" + this.dataType + - ", data=" + this.data + ", ruleChainId=" + this.ruleChainId + ", ruleNodeId=" + this.ruleNodeId + - ", correlationId=" + this.correlationId + ", partition=" + this.partition + ", previousCalculatedFields=" + this.previousCalculatedFieldIds + - ", ctx=" + this.ctx + ", callback=" + this.callback + ")"; + ", type=" + this.type + ", internalType=" + this.internalType + ", originator=" + this.originator + + ", customerId=" + this.customerId + ", metaData=" + this.metaData + ", dataType=" + this.dataType + + ", data=" + this.data + ", ruleChainId=" + this.ruleChainId + ", ruleNodeId=" + this.ruleNodeId + + ", correlationId=" + this.correlationId + ", partition=" + this.partition + ", previousCalculatedFields=" + this.previousCalculatedFieldIds + + ", ctx=" + this.ctx + ", callback=" + this.callback + ")"; } } diff --git a/common/message/src/main/proto/tbmsg.proto b/common/message/src/main/proto/tbmsg.proto index 65a967e9e4..e70104d503 100644 --- a/common/message/src/main/proto/tbmsg.proto +++ b/common/message/src/main/proto/tbmsg.proto @@ -59,8 +59,8 @@ message TbMsgProto { string data = 14; int64 ts = 15; - // Will be removed in 3.4. Moved to processing context - int32 ruleNodeExecCounter = 16; + + // ruleNodeExecCounter (16) was removed in 4.1 int64 customerIdMSB = 17; int64 customerIdLSB = 18; diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index 1ebd753f3c..502884eb31 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -18,6 +18,8 @@ package org.thingsboard.server.common.util; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.ApiUsageRecordKey; @@ -76,12 +78,15 @@ import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.data.sync.vc.RepositoryAuthMethod; import org.thingsboard.server.common.data.sync.vc.RepositorySettings; +import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.common.msg.edge.EdgeHighPriorityMsg; import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; +import org.thingsboard.server.common.msg.gen.MsgProtos; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; +import org.thingsboard.server.common.msg.queue.TbMsgCallback; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg; import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg; @@ -93,8 +98,8 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceDeleteMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto; import org.thingsboard.server.gen.transport.TransportProtos.ApiUsageRecordKeyProto; +import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto; import java.util.ArrayList; import java.util.Arrays; @@ -1344,6 +1349,21 @@ public class ProtoUtils { return builder.build(); } + @Deprecated(forRemoval = true, since = "4.1") + public static MsgProtos.TbMsgProto getTbMsgProto(TransportProtos.ToRuleEngineMsg ruleEngineMsg) throws InvalidProtocolBufferException { + if (ruleEngineMsg.getTbMsg().isEmpty()) { + return ruleEngineMsg.getTbMsgProto(); + } else { + return MsgProtos.TbMsgProto.parseFrom(ruleEngineMsg.getTbMsg()); + } + } + + @SneakyThrows + @Deprecated(forRemoval = true, since = "4.1") // inline to TbMsg.fromProto(queueName, ruleEngineMsg.getTbMsgProto(), callback) + public static TbMsg fromTbMsgProto(String queueName, TransportProtos.ToRuleEngineMsg ruleEngineMsg, TbMsgCallback callback) { + return TbMsg.fromProto(queueName, getTbMsgProto(ruleEngineMsg), callback); + } + private static boolean isNotNull(Object obj) { return obj != null; } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 938a1692ae..2a97fd35d0 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -20,6 +20,8 @@ package transport; option java_package = "org.thingsboard.server.gen.transport"; option java_outer_classname = "TransportProtos"; +import "tbmsg.proto"; + /** * Common data structures */ @@ -125,7 +127,8 @@ message SessionInfoProto { message RestApiCallResponseMsgProto { int64 requestIdMSB = 1; int64 requestIdLSB = 2; - bytes response = 5; + bytes response = 5 [deprecated = true]; + msgqueue.TbMsgProto responseProto = 6; } enum SessionEvent { @@ -1700,9 +1703,10 @@ message ToCalculatedFieldNotificationMsg { message ToRuleEngineMsg { int64 tenantIdMSB = 1; int64 tenantIdLSB = 2; - bytes tbMsg = 3; + bytes tbMsg = 3 [deprecated = true]; // for removal in 4.2 repeated string relationTypes = 4; string failureMessage = 5; + msgqueue.TbMsgProto tbMsgProto = 6; } message ToRuleEngineNotificationMsg { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/TbRuleEngineProducerService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/TbRuleEngineProducerService.java index 46e56a97fc..db32eeca14 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/TbRuleEngineProducerService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/TbRuleEngineProducerService.java @@ -65,7 +65,7 @@ public class TbRuleEngineProducerService { log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg); } ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder() - .setTbMsg(TbMsg.toByteString(tbMsg)) + .setTbMsgProto(TbMsg.toProto(tbMsg)) .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build(); producer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);