From e0fd611c768d98ef6475f9872aaa08bb533bad82 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 13 Dec 2024 14:35:02 +0100 Subject: [PATCH] used TbMsgProto instead of ByteString --- .../actors/ruleChain/DefaultTbContext.java | 5 +- .../RuleChainActorMessageProcessor.java | 2 +- .../RuleNodeActorMessageProcessor.java | 2 +- .../device/DeviceProvisionServiceImpl.java | 3 +- ...riginatorIdTbRuleEngineSubmitStrategy.java | 7 +- ...TbRuleEngineProcessingStrategyFactory.java | 6 +- .../TbRuleEngineQueueConsumerManager.java | 14 +++- .../rpc/DefaultTbRuleEngineRpcService.java | 2 +- .../DefaultRuleEngineCallService.java | 2 +- .../actors/rule/DefaultTbContextTest.java | 12 +-- .../controller/TenantControllerTest.java | 3 +- .../TbRuleEngineQueueConsumerManagerTest.java | 2 +- .../ruleengine/TbRuleEngineStrategyTest.java | 2 +- .../DefaultTbRuleEngineRpcServiceTest.java | 2 +- .../DefaultRuleEngineCallServiceTest.java | 2 +- .../thingsboard/server/common/msg/TbMsg.java | 82 ++++++++++--------- common/proto/src/main/proto/queue.proto | 8 +- .../common/TbRuleEngineProducerService.java | 2 +- 18 files changed, 90 insertions(+), 68 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 0070eba4a1..6ebcc1843e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -230,7 +230,8 @@ public class DefaultTbContext implements TbContext { TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + .setTbMsgProto(TbMsg.toProto(tbMsg)) + .build(); mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback); } @@ -309,7 +310,7 @@ public class DefaultTbContext implements TbContext { TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)) + .setTbMsgProto(TbMsg.toProto(tbMsg)) .addAllRelationTypes(relationTypes); if (failureMessage != null) { msg.setFailureMessage(failureMessage); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 460da228c3..917066cd2d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -370,7 +370,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor(tbMsg.getId(), msg), callback); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java index 1ee6df8361..3c11095894 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByOriginatorIdTbRuleEngineSubmitStrategy.java @@ -34,7 +34,12 @@ public class SequentialByOriginatorIdTbRuleEngineSubmitStrategy extends Sequenti @Override protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) { try { - MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(msg.getTbMsg()); + MsgProtos.TbMsgProto proto; + if (msg.getTbMsg().isEmpty()) { + proto = msg.getTbMsgProto(); + } else { + proto = MsgProtos.TbMsgProto.parseFrom(msg.getTbMsg()); + } return EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); } catch (InvalidProtocolBufferException e) { log.warn("[{}] Failed to parse TbMsg: {}", queueName, msg); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java index 4712497524..597bcef098 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -125,7 +125,7 @@ public class TbRuleEngineProcessingStrategyFactory { } log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); if (log.isTraceEnabled()) { - toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromProto(result.getQueueName(), msg.getValue().getTbMsgProto(), msg.getValue().getTbMsg(), TbMsgCallback.EMPTY))); } if (pauseBetweenRetries > 0) { try { @@ -164,10 +164,10 @@ public class TbRuleEngineProcessingStrategyFactory { log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); } if (log.isTraceEnabled()) { - result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromProto(result.getQueueName(), msg.getValue().getTbMsgProto(), msg.getValue().getTbMsg(), TbMsgCallback.EMPTY))); } if (log.isTraceEnabled()) { - result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromProto(result.getQueueName(), msg.getValue().getTbMsgProto(), msg.getValue().getTbMsg(), TbMsgCallback.EMPTY))); } return new TbRuleEngineProcessingDecision(true, null); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index c2823d3c00..231d7b205d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -170,7 +170,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager relationTypes; @@ -199,7 +199,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager> pending : map.entrySet()) { ToRuleEngineMsg tmp = pending.getValue().getValue(); - TbMsg tmpMsg = TbMsg.fromBytes(config.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY); + TbMsg tmpMsg = TbMsg.fromProto(config.getName(), tmp.getTbMsgProto(), tmp.getTbMsg(), TbMsgCallback.EMPTY); RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey()); if (printAll) { log.trace("[{}][{}] {} to process message: {}, Last Rule Node: {}", queueKey, TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo); @@ -228,7 +228,13 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager msg : msgs) { try { - MsgProtos.TbMsgProto tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg().toByteArray()); + MsgProtos.TbMsgProto tbMsgProto; + if (msg.getValue().getTbMsg().isEmpty()) { + tbMsgProto = msg.getValue().getTbMsgProto(); + } else { + tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg()); + } + EntityId originator = EntityIdFactory.getByTypeAndUuid(tbMsgProto.getEntityType(), new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB())); TopicPartitionInfo tpi = ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, config.getName(), TenantId.SYS_TENANT_ID, originator); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index debc1cb987..80b3bd7a37 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -132,7 +132,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi TransportProtos.RestApiCallResponseMsgProto msg = TransportProtos.RestApiCallResponseMsgProto.newBuilder() .setRequestIdMSB(requestId.getMostSignificantBits()) .setRequestIdLSB(requestId.getLeastSignificantBits()) - .setResponse(TbMsg.toByteString(tbMsg)) + .setResponseProto(TbMsg.toProto(tbMsg)) .build(); clusterService.pushNotificationToCore(serviceId, msg, null); } diff --git a/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java b/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java index 6f4d18b94e..e435c8ad36 100644 --- a/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java +++ b/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java @@ -73,7 +73,7 @@ public class DefaultRuleEngineCallService implements RuleEngineCallService { UUID requestId = new UUID(restApiCallResponseMsg.getRequestIdMSB(), restApiCallResponseMsg.getRequestIdLSB()); Consumer consumer = requests.remove(requestId); if (consumer != null) { - consumer.accept(TbMsg.fromBytes(null, restApiCallResponseMsg.getResponse().toByteArray(), TbMsgCallback.EMPTY)); + consumer.accept(TbMsg.fromProto(null, restApiCallResponseMsg.getResponseProto(), restApiCallResponseMsg.getResponse(), TbMsgCallback.EMPTY)); } else { log.trace("[{}] Unknown or stale rest api call response received", requestId); } diff --git a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java index 21a4a45c6f..4370b98b2d 100644 --- a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java +++ b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java @@ -644,11 +644,11 @@ class DefaultTbContextTest { ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); assertThat(actualToRuleEngineMsg).usingRecursiveComparison() - .ignoringFields("tbMsg_") + .ignoringFields("tbMsgProto_") .isEqualTo(ToRuleEngineMsg.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(expectedTbMsg)) + .setTbMsgProto(TbMsg.toProto(expectedTbMsg)) .addAllRelationTypes(List.of(connectionType)).build()); var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue(); @@ -701,11 +701,11 @@ class DefaultTbContextTest { ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); assertThat(actualToRuleEngineMsg).usingRecursiveComparison() - .ignoringFields("tbMsg_") + .ignoringFields("tbMsgProto_") .isEqualTo(ToRuleEngineMsg.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(expectedTbMsg)) + .setTbMsgProto(TbMsg.toProto(expectedTbMsg)) .build()); var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue(); @@ -883,11 +883,11 @@ class DefaultTbContextTest { ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); assertThat(actualToRuleEngineMsg).usingRecursiveComparison() - .ignoringFields("tbMsg_") + .ignoringFields("tbMsgProto_.id_") .isEqualTo(ToRuleEngineMsg.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(expectedTbMsg)) + .setTbMsgProto(TbMsg.toProto(expectedTbMsg)) .setFailureMessage(EXCEPTION_MSG) .addAllRelationTypes(List.of(TbNodeConnectionType.FAILURE)).build()); diff --git a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java index c8cc72c5b5..dbf40c0e30 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java @@ -747,7 +747,8 @@ public class TenantControllerTest extends AbstractControllerTest { TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + .setTbMsgProto(TbMsg.toProto(tbMsg)) + .build(); tbClusterService.pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, null); return tbMsg; } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index 66e3de13d1..bff5d165f4 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -738,7 +738,7 @@ public class TbRuleEngineQueueConsumerManagerTest { .setTenantIdMSB(tenantId.getMostSignificantBits()) .setTenantIdLSB(tenantId.getLeastSignificantBits()) .addRelationTypes("Success") - .setTbMsg(TbMsg.toByteString(tbMsg)) + .setTbMsgProto(TbMsg.toProto(tbMsg)) .build()); } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java index 098c622e31..435304fdef 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java @@ -263,7 +263,7 @@ public class TbRuleEngineStrategyTest { .setTenantIdMSB(tenantId.getMostSignificantBits()) .setTenantIdLSB(tenantId.getLeastSignificantBits()) .addRelationTypes("Success") - .setTbMsg(TbMsg.toByteString(tbMsg)) + .setTbMsgProto(TbMsg.toProto(tbMsg)) .build()); } diff --git a/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java b/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java index 2cab5f991a..ea0b3b91f5 100644 --- a/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcServiceTest.java @@ -50,7 +50,7 @@ class DefaultTbRuleEngineRpcServiceTest { var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.newBuilder() .setRequestIdMSB(requestId.getMostSignificantBits()) .setRequestIdLSB(requestId.getLeastSignificantBits()) - .setResponse(TbMsg.toByteString(msg)) + .setResponseProto(TbMsg.toProto(msg)) .build(); // WHEN diff --git a/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java b/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java index c49149bd07..a4ee9cda71 100644 --- a/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java @@ -132,7 +132,7 @@ public class DefaultRuleEngineCallServiceTest { private TransportProtos.RestApiCallResponseMsgProto getResponse(UUID requestId, TbMsg msg) { return TransportProtos.RestApiCallResponseMsgProto.newBuilder() - .setResponse(TbMsg.toByteString(msg)) + .setResponseProto(TbMsg.toProto(msg)) .setRequestIdMSB(requestId.getMostSignificantBits()) .setRequestIdLSB(requestId.getLeastSignificantBits()) .build(); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 47fb80c90e..88e7c6b691 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -371,11 +371,7 @@ public final class TbMsg implements Serializable { this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY); } - public static ByteString toByteString(TbMsg msg) { - return ByteString.copyFrom(toByteArray(msg)); - } - - public static byte[] toByteArray(TbMsg msg) { + public static MsgProtos.TbMsgProto toProto(TbMsg msg) { MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder(); builder.setId(msg.getId().toString()); builder.setTs(msg.getTs()); @@ -415,47 +411,55 @@ public final class TbMsg implements Serializable { } builder.setCtx(msg.ctx.toProto()); - return builder.build().toByteArray(); + return builder.build(); } - public static TbMsg fromBytes(String queueName, byte[] data, TbMsgCallback callback) { + //TODO: added for processing old messages from queue, should be removed after release + @Deprecated(forRemoval = true) + public static TbMsg fromProto(String queueName, MsgProtos.TbMsgProto proto, ByteString data, TbMsgCallback callback) { try { - MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data); - TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); - EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); - CustomerId customerId = null; - RuleChainId ruleChainId = null; - RuleNodeId ruleNodeId = null; - UUID correlationId = null; - Integer partition = null; - if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) { - customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB())); + if (!data.isEmpty()) { + proto = MsgProtos.TbMsgProto.parseFrom(data); } - if (proto.getRuleChainIdMSB() != 0L && proto.getRuleChainIdLSB() != 0L) { - ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB())); - } - if (proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) { - ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB())); - } - if (proto.getCorrelationIdMSB() != 0L && proto.getCorrelationIdLSB() != 0L) { - correlationId = new UUID(proto.getCorrelationIdMSB(), proto.getCorrelationIdLSB()); - partition = proto.getPartition(); - } - - TbMsgProcessingCtx ctx; - if (proto.hasCtx()) { - ctx = TbMsgProcessingCtx.fromProto(proto.getCtx()); - } else { - // Backward compatibility with unprocessed messages fetched from queue after update. - ctx = new TbMsgProcessingCtx(proto.getRuleNodeExecCounter()); - } - - TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; - return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId, - metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, ctx, callback); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Could not parse protobuf for TbMsg", e); } + return fromProto(queueName, proto, callback); + } + + public static TbMsg fromProto(String queueName, MsgProtos.TbMsgProto proto, TbMsgCallback callback) { + TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); + EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); + CustomerId customerId = null; + RuleChainId ruleChainId = null; + RuleNodeId ruleNodeId = null; + UUID correlationId = null; + Integer partition = null; + if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) { + customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB())); + } + if (proto.getRuleChainIdMSB() != 0L && proto.getRuleChainIdLSB() != 0L) { + ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB())); + } + if (proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) { + ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB())); + } + if (proto.getCorrelationIdMSB() != 0L && proto.getCorrelationIdLSB() != 0L) { + correlationId = new UUID(proto.getCorrelationIdMSB(), proto.getCorrelationIdLSB()); + partition = proto.getPartition(); + } + + TbMsgProcessingCtx ctx; + if (proto.hasCtx()) { + ctx = TbMsgProcessingCtx.fromProto(proto.getCtx()); + } else { + // Backward compatibility with unprocessed messages fetched from queue after update. + ctx = new TbMsgProcessingCtx(proto.getRuleNodeExecCounter()); + } + + TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; + return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId, + metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, ctx, callback); } public TbMsg copyWithRuleChainId(RuleChainId ruleChainId) { diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 228a4039d2..ceef0914a7 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -20,6 +20,8 @@ package transport; option java_package = "org.thingsboard.server.gen.transport"; option java_outer_classname = "TransportProtos"; +import "tbmsg.proto"; + /** * Common data structures */ @@ -108,7 +110,8 @@ message SessionInfoProto { message RestApiCallResponseMsgProto { int64 requestIdMSB = 1; int64 requestIdLSB = 2; - bytes response = 5; + bytes response = 5 [deprecated = true]; + msgqueue.TbMsgProto responseProto = 6; } enum SessionEvent { @@ -1556,9 +1559,10 @@ message ToEdgeEventNotificationMsg { message ToRuleEngineMsg { int64 tenantIdMSB = 1; int64 tenantIdLSB = 2; - bytes tbMsg = 3; + bytes tbMsg = 3 [deprecated = true]; repeated string relationTypes = 4; string failureMessage = 5; + msgqueue.TbMsgProto tbMsgProto = 6; } message ToRuleEngineNotificationMsg { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/TbRuleEngineProducerService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/TbRuleEngineProducerService.java index 49b40e3a6d..aefce4e176 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/TbRuleEngineProducerService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/TbRuleEngineProducerService.java @@ -65,7 +65,7 @@ public class TbRuleEngineProducerService { log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg); } ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder() - .setTbMsg(TbMsg.toByteString(tbMsg)) + .setTbMsgProto(TbMsg.toProto(tbMsg)) .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build(); producer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);