diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 3fb28aee38..c07134105c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -235,7 +235,8 @@ public class DefaultTbContext implements TbContext { TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + .setTbMsgProto(TbMsg.toProto(tbMsg)) + .build(); mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback); } @@ -314,7 +315,7 @@ public class DefaultTbContext implements TbContext { TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)) + .setTbMsgProto(TbMsg.toProto(tbMsg)) .addAllRelationTypes(relationTypes); if (failureMessage != null) { msg.setFailureMessage(failureMessage); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 5c3f2b7052..d93733f562 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -381,7 +381,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor(tbMsg.getId(), msg), callback); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java index 00a9a8e2ce..aec9252356 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java @@ -34,7 +34,12 @@ public class SequentialByOriginatorIdTbRuleEngineSubmitStrategy extends Sequenti @Override protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) { try { - MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(msg.getTbMsg()); + MsgProtos.TbMsgProto proto; + if (msg.getTbMsg().isEmpty()) { + proto = msg.getTbMsgProto(); + } else { + proto = MsgProtos.TbMsgProto.parseFrom(msg.getTbMsg()); + } return EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); } catch (InvalidProtocolBufferException e) { log.warn("[{}] Failed to parse TbMsg: {}", queueName, msg); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java index 0242c676a1..2adcd9b199 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -125,7 +125,7 @@ public class TbRuleEngineProcessingStrategyFactory { } log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); if (log.isTraceEnabled()) { - toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromProto(result.getQueueName(), msg.getValue().getTbMsgProto(), msg.getValue().getTbMsg(), TbMsgCallback.EMPTY))); } if (pauseBetweenRetries > 0) { try { @@ -164,10 +164,10 @@ public class TbRuleEngineProcessingStrategyFactory { log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); } if (log.isTraceEnabled()) { - result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromProto(result.getQueueName(), msg.getValue().getTbMsgProto(), msg.getValue().getTbMsg(), TbMsgCallback.EMPTY))); } if (log.isTraceEnabled()) { - result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromProto(result.getQueueName(), msg.getValue().getTbMsgProto(), msg.getValue().getTbMsg(), TbMsgCallback.EMPTY))); } return new TbRuleEngineProcessingDecision(true, null); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 879da5afc6..01368f780e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -178,7 +178,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager relationTypes; @@ -207,7 +207,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager> pending : map.entrySet()) { ToRuleEngineMsg tmp = pending.getValue().getValue(); - TbMsg tmpMsg = TbMsg.fromBytes(config.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY); + TbMsg tmpMsg = TbMsg.fromProto(config.getName(), tmp.getTbMsgProto(), tmp.getTbMsg(), TbMsgCallback.EMPTY); RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey()); if (printAll) { log.trace("[{}][{}] {} to process message: {}, Last Rule Node: {}", queueKey, TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo); @@ -236,7 +236,13 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager msg : msgs) { try { - MsgProtos.TbMsgProto tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg().toByteArray()); + MsgProtos.TbMsgProto tbMsgProto; + if (msg.getValue().getTbMsg().isEmpty()) { + tbMsgProto = msg.getValue().getTbMsgProto(); + } else { + tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg()); + } + EntityId originator = EntityIdFactory.getByTypeAndUuid(tbMsgProto.getEntityType(), new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB())); TopicPartitionInfo tpi = ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, config.getName(), TenantId.SYS_TENANT_ID, originator); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 463624843a..1d5cbb6ebc 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -133,7 +133,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi TransportProtos.RestApiCallResponseMsgProto msg = TransportProtos.RestApiCallResponseMsgProto.newBuilder() .setRequestIdMSB(requestId.getMostSignificantBits()) .setRequestIdLSB(requestId.getLeastSignificantBits()) - .setResponse(TbMsg.toByteString(tbMsg)) + .setResponseProto(TbMsg.toProto(tbMsg)) .build(); clusterService.pushNotificationToCore(serviceId, msg, null); } diff --git a/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java b/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java index 7cfa55b857..4496bf0ef2 100644 --- a/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java +++ b/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java @@ -73,7 +73,7 @@ public class DefaultRuleEngineCallService implements RuleEngineCallService { UUID requestId = new UUID(restApiCallResponseMsg.getRequestIdMSB(), restApiCallResponseMsg.getRequestIdLSB()); Consumer consumer = requests.remove(requestId); if (consumer != null) { - consumer.accept(TbMsg.fromBytes(null, restApiCallResponseMsg.getResponse().toByteArray(), TbMsgCallback.EMPTY)); + consumer.accept(TbMsg.fromProto(null, restApiCallResponseMsg.getResponseProto(), restApiCallResponseMsg.getResponse(), TbMsgCallback.EMPTY)); } else { log.trace("[{}] Unknown or stale rest api call response received", requestId); } diff --git a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java index 54205327ca..bf055ee146 100644 --- a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java +++ b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java @@ -770,11 +770,11 @@ class DefaultTbContextTest { ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); assertThat(actualToRuleEngineMsg).usingRecursiveComparison() - .ignoringFields("tbMsg_") + .ignoringFields("tbMsgProto_") .isEqualTo(ToRuleEngineMsg.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(expectedTbMsg)) + .setTbMsgProto(TbMsg.toProto(expectedTbMsg)) .addAllRelationTypes(List.of(connectionType)).build()); var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue(); @@ -827,11 +827,11 @@ class DefaultTbContextTest { ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); assertThat(actualToRuleEngineMsg).usingRecursiveComparison() - .ignoringFields("tbMsg_") + .ignoringFields("tbMsgProto_") .isEqualTo(ToRuleEngineMsg.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(expectedTbMsg)) + .setTbMsgProto(TbMsg.toProto(expectedTbMsg)) .build()); var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue(); @@ -1009,11 +1009,11 @@ class DefaultTbContextTest { ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); assertThat(actualToRuleEngineMsg).usingRecursiveComparison() - .ignoringFields("tbMsg_") + .ignoringFields("tbMsgProto_.id_") .isEqualTo(ToRuleEngineMsg.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(expectedTbMsg)) + .setTbMsgProto(TbMsg.toProto(expectedTbMsg)) .setFailureMessage(EXCEPTION_MSG) .addAllRelationTypes(List.of(TbNodeConnectionType.FAILURE)).build()); diff --git a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java index 4fd613dc1d..ba45be8e28 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java @@ -762,7 +762,8 @@ public class TenantControllerTest extends AbstractControllerTest { TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + .setTbMsgProto(TbMsg.toProto(tbMsg)) + .build(); tbClusterService.pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, null); return tbMsg; } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index 314c76b058..dd9c4bebcf 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -737,7 +737,7 @@ public class TbRuleEngineQueueConsumerManagerTest { .setTenantIdMSB(tenantId.getMostSignificantBits()) .setTenantIdLSB(tenantId.getLeastSignificantBits()) .addRelationTypes("Success") - .setTbMsg(TbMsg.toByteString(tbMsg)) + .setTbMsgProto(TbMsg.toProto(tbMsg)) .build()); } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java index 110bba0eaf..8c019029a5 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java @@ -263,7 +263,7 @@ public class TbRuleEngineStrategyTest { .setTenantIdMSB(tenantId.getMostSignificantBits()) .setTenantIdLSB(tenantId.getLeastSignificantBits()) .addRelationTypes("Success") - .setTbMsg(TbMsg.toByteString(tbMsg)) + .setTbMsgProto(TbMsg.toProto(tbMsg)) .build()); } diff --git a/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java b/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java index fceef46b89..ad5160afda 100644 --- a/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java @@ -55,7 +55,7 @@ class DefaultTbRuleEngineRpcServiceTest { var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.newBuilder() .setRequestIdMSB(requestId.getMostSignificantBits()) .setRequestIdLSB(requestId.getLeastSignificantBits()) - .setResponse(TbMsg.toByteString(msg)) + .setResponseProto(TbMsg.toProto(msg)) .build(); // WHEN diff --git a/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java b/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java index eeec376ab4..60c71a3837 100644 --- a/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java @@ -144,7 +144,7 @@ public class DefaultRuleEngineCallServiceTest { private TransportProtos.RestApiCallResponseMsgProto getResponse(UUID requestId, TbMsg msg) { return TransportProtos.RestApiCallResponseMsgProto.newBuilder() - .setResponse(TbMsg.toByteString(msg)) + .setResponseProto(TbMsg.toProto(msg)) .setRequestIdMSB(requestId.getMostSignificantBits()) .setRequestIdLSB(requestId.getLeastSignificantBits()) .build(); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 4e0f583285..b7447b14ba 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -151,11 +151,7 @@ public final class TbMsg implements Serializable { this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY); } - public static ByteString toByteString(TbMsg msg) { - return ByteString.copyFrom(toByteArray(msg)); - } - - public static byte[] toByteArray(TbMsg msg) { + public static MsgProtos.TbMsgProto toProto(TbMsg msg) { MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder(); builder.setId(msg.getId().toString()); builder.setTs(msg.getTs()); @@ -205,56 +201,61 @@ public final class TbMsg implements Serializable { } builder.setCtx(msg.ctx.toProto()); - return builder.build().toByteArray(); + return builder.build(); } - public static TbMsg fromBytes(String queueName, byte[] data, TbMsgCallback callback) { + //TODO: added for processing old messages from queue, should be removed after release + @Deprecated(forRemoval = true) + public static TbMsg fromProto(String queueName, MsgProtos.TbMsgProto proto, ByteString data, TbMsgCallback callback) { try { - MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data); - TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); - EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); - CustomerId customerId = null; - RuleChainId ruleChainId = null; - RuleNodeId ruleNodeId = null; - UUID correlationId = null; - Integer partition = null; - List calculatedFieldIds = new CopyOnWriteArrayList<>(); - if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) { - customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB())); - } - if (proto.getRuleChainIdMSB() != 0L && proto.getRuleChainIdLSB() != 0L) { - ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB())); - } - if (proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) { - ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB())); - } - if (proto.getCorrelationIdMSB() != 0L && proto.getCorrelationIdLSB() != 0L) { - correlationId = new UUID(proto.getCorrelationIdMSB(), proto.getCorrelationIdLSB()); - partition = proto.getPartition(); + if (!data.isEmpty()) { + proto = MsgProtos.TbMsgProto.parseFrom(data); } + } catch (InvalidProtocolBufferException e) { + throw new IllegalStateException("Could not parse protobuf for TbMsg", e); + } + return fromProto(queueName, proto, callback); + } - for (MsgProtos.CalculatedFieldIdProto cfIdProto : proto.getCalculatedFieldsList()) { + public static TbMsg fromProto(String queueName, MsgProtos.TbMsgProto proto, TbMsgCallback callback) { + TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); + EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); + CustomerId customerId = null; + RuleChainId ruleChainId = null; + RuleNodeId ruleNodeId = null; + UUID correlationId = null; + Integer partition = null; + List calculatedFieldIds = new CopyOnWriteArrayList<>();if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) { + customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB())); + } + if (proto.getRuleChainIdMSB() != 0L && proto.getRuleChainIdLSB() != 0L) { + ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB())); + } + if (proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) { + ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB())); + } + if (proto.getCorrelationIdMSB() != 0L && proto.getCorrelationIdLSB() != 0L) { + correlationId = new UUID(proto.getCorrelationIdMSB(), proto.getCorrelationIdLSB()); + partition = proto.getPartition(); + } + + for (MsgProtos.CalculatedFieldIdProto cfIdProto : proto.getCalculatedFieldsList()) { CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID( cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB() )); calculatedFieldIds.add(calculatedFieldId); - } - - TbMsgProcessingCtx ctx; - if (proto.hasCtx()) { - ctx = TbMsgProcessingCtx.fromProto(proto.getCtx()); - } else { - // Backward compatibility with unprocessed messages fetched from queue after update. - ctx = new TbMsgProcessingCtx(proto.getRuleNodeExecCounter()); - } - - TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; - return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId, - metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback); - } catch (InvalidProtocolBufferException e) { - throw new IllegalStateException("Could not parse protobuf for TbMsg", e); + }TbMsgProcessingCtx ctx; + if (proto.hasCtx()) { + ctx = TbMsgProcessingCtx.fromProto(proto.getCtx()); + } else { + // Backward compatibility with unprocessed messages fetched from queue after update. + ctx = new TbMsgProcessingCtx(proto.getRuleNodeExecCounter()); } + + TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; + return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId, + metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback); } public int getAndIncrementRuleNodeCounter() { diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 938a1692ae..449f662a93 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -20,6 +20,8 @@ package transport; option java_package = "org.thingsboard.server.gen.transport"; option java_outer_classname = "TransportProtos"; +import "tbmsg.proto"; + /** * Common data structures */ @@ -125,7 +127,8 @@ message SessionInfoProto { message RestApiCallResponseMsgProto { int64 requestIdMSB = 1; int64 requestIdLSB = 2; - bytes response = 5; + bytes response = 5 [deprecated = true]; + msgqueue.TbMsgProto responseProto = 6; } enum SessionEvent { @@ -1700,9 +1703,10 @@ message ToCalculatedFieldNotificationMsg { message ToRuleEngineMsg { int64 tenantIdMSB = 1; int64 tenantIdLSB = 2; - bytes tbMsg = 3; + bytes tbMsg = 3 [deprecated = true]; repeated string relationTypes = 4; string failureMessage = 5; + msgqueue.TbMsgProto tbMsgProto = 6; } message ToRuleEngineNotificationMsg { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/TbRuleEngineProducerService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/TbRuleEngineProducerService.java index 46e56a97fc..db32eeca14 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/TbRuleEngineProducerService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/TbRuleEngineProducerService.java @@ -65,7 +65,7 @@ public class TbRuleEngineProducerService { log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg); } ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder() - .setTbMsg(TbMsg.toByteString(tbMsg)) + .setTbMsgProto(TbMsg.toProto(tbMsg)) .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build(); producer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);