Merge pull request #12257 from YevhenBondarenko/feature/prod-4738

Used TbMsgProto instead of ByteString
This commit is contained in:
Viacheslav Klimov 2025-05-07 12:28:29 +03:00 committed by GitHub
commit 0adfaf2b01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 92 additions and 73 deletions

View File

@ -235,7 +235,8 @@ public class DefaultTbContext implements TbContext {
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)).build(); .setTbMsgProto(TbMsg.toProto(tbMsg))
.build();
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback); mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback);
} }
@ -314,7 +315,7 @@ public class DefaultTbContext implements TbContext {
TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder() TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)) .setTbMsgProto(TbMsg.toProto(tbMsg))
.addAllRelationTypes(relationTypes); .addAllRelationTypes(relationTypes);
if (failureMessage != null) { if (failureMessage != null) {
msg.setFailureMessage(failureMessage); msg.setFailureMessage(failureMessage);

View File

@ -381,7 +381,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
ToRuleEngineMsg toQueueMsg = ToRuleEngineMsg.newBuilder() ToRuleEngineMsg toQueueMsg = ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(newMsg)) .setTbMsgProto(TbMsg.toProto(newMsg))
.build(); .build();
clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper); clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper);
} }

View File

@ -210,7 +210,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
TransportProtos.ToRuleEngineMsg toQueueMsg = TransportProtos.ToRuleEngineMsg.newBuilder() TransportProtos.ToRuleEngineMsg toQueueMsg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)) .setTbMsgProto(TbMsg.toProto(tbMsg))
.build(); .build();
systemContext.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), toQueueMsg, null); systemContext.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), toQueueMsg, null);
defaultCtx.ack(source); defaultCtx.ack(source);

View File

@ -284,7 +284,8 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) { protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tbMsg.getOriginator()); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tbMsg.getOriginator());
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg)) TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTbMsgProto(TbMsg.toProto(tbMsg))
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build(); .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback); ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);

View File

@ -34,7 +34,12 @@ public class SequentialByOriginatorIdTbRuleEngineSubmitStrategy extends Sequenti
@Override @Override
protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) { protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) {
try { try {
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(msg.getTbMsg()); MsgProtos.TbMsgProto proto;
if (msg.getTbMsg().isEmpty()) {
proto = msg.getTbMsgProto();
} else {
proto = MsgProtos.TbMsgProto.parseFrom(msg.getTbMsg());
}
return EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); return EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
log.warn("[{}] Failed to parse TbMsg: {}", queueName, msg); log.warn("[{}] Failed to parse TbMsg: {}", queueName, msg);

View File

@ -125,7 +125,7 @@ public class TbRuleEngineProcessingStrategyFactory {
} }
log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromProto(result.getQueueName(), msg.getValue().getTbMsgProto(), msg.getValue().getTbMsg(), TbMsgCallback.EMPTY)));
} }
if (pauseBetweenRetries > 0) { if (pauseBetweenRetries > 0) {
try { try {
@ -164,10 +164,10 @@ public class TbRuleEngineProcessingStrategyFactory {
log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
} }
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromProto(result.getQueueName(), msg.getValue().getTbMsgProto(), msg.getValue().getTbMsg(), TbMsgCallback.EMPTY)));
} }
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromProto(result.getQueueName(), msg.getValue().getTbMsgProto(), msg.getValue().getTbMsg(), TbMsgCallback.EMPTY)));
} }
return new TbRuleEngineProcessingDecision(true, null); return new TbRuleEngineProcessingDecision(true, null);
} }

View File

@ -178,7 +178,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
new TbMsgPackCallback(id, tenantId, packCtx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) : new TbMsgPackCallback(id, tenantId, packCtx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) :
new TbMsgPackCallback(id, tenantId, packCtx); new TbMsgPackCallback(id, tenantId, packCtx);
try { try {
if (!toRuleEngineMsg.getTbMsg().isEmpty()) { if (!toRuleEngineMsg.getTbMsg().isEmpty() || toRuleEngineMsg.getTbMsgProto().isInitialized()) {
forwardToRuleEngineActor(config.getName(), tenantId, toRuleEngineMsg, callback); forwardToRuleEngineActor(config.getName(), tenantId, toRuleEngineMsg, callback);
} else { } else {
callback.onSuccess(); callback.onSuccess();
@ -189,7 +189,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
} }
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) { private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback); TbMsg tbMsg = TbMsg.fromProto(queueName, toRuleEngineMsg.getTbMsgProto(), toRuleEngineMsg.getTbMsg(), callback);
QueueToRuleEngineMsg msg; QueueToRuleEngineMsg msg;
ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList(); ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
Set<String> relationTypes; Set<String> relationTypes;
@ -207,7 +207,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
log.info("[{}] {} to process [{}] messages", queueKey, prefix, map.size()); log.info("[{}] {} to process [{}] messages", queueKey, prefix, map.size());
for (Map.Entry<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pending : map.entrySet()) { for (Map.Entry<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pending : map.entrySet()) {
ToRuleEngineMsg tmp = pending.getValue().getValue(); ToRuleEngineMsg tmp = pending.getValue().getValue();
TbMsg tmpMsg = TbMsg.fromBytes(config.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY); TbMsg tmpMsg = TbMsg.fromProto(config.getName(), tmp.getTbMsgProto(), tmp.getTbMsg(), TbMsgCallback.EMPTY);
RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey()); RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey());
if (printAll) { if (printAll) {
log.trace("[{}][{}] {} to process message: {}, Last Rule Node: {}", queueKey, TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo); log.trace("[{}][{}] {} to process message: {}, Last Rule Node: {}", queueKey, TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo);
@ -236,7 +236,13 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
} }
for (TbProtoQueueMsg<ToRuleEngineMsg> msg : msgs) { for (TbProtoQueueMsg<ToRuleEngineMsg> msg : msgs) {
try { try {
MsgProtos.TbMsgProto tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg().toByteArray()); MsgProtos.TbMsgProto tbMsgProto;
if (msg.getValue().getTbMsg().isEmpty()) {
tbMsgProto = msg.getValue().getTbMsgProto();
} else {
tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg());
}
EntityId originator = EntityIdFactory.getByTypeAndUuid(tbMsgProto.getEntityType(), new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB())); EntityId originator = EntityIdFactory.getByTypeAndUuid(tbMsgProto.getEntityType(), new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB()));
TopicPartitionInfo tpi = ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, config.getName(), TenantId.SYS_TENANT_ID, originator); TopicPartitionInfo tpi = ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, config.getName(), TenantId.SYS_TENANT_ID, originator);

View File

@ -133,7 +133,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
TransportProtos.RestApiCallResponseMsgProto msg = TransportProtos.RestApiCallResponseMsgProto.newBuilder() TransportProtos.RestApiCallResponseMsgProto msg = TransportProtos.RestApiCallResponseMsgProto.newBuilder()
.setRequestIdMSB(requestId.getMostSignificantBits()) .setRequestIdMSB(requestId.getMostSignificantBits())
.setRequestIdLSB(requestId.getLeastSignificantBits()) .setRequestIdLSB(requestId.getLeastSignificantBits())
.setResponse(TbMsg.toByteString(tbMsg)) .setResponseProto(TbMsg.toProto(tbMsg))
.build(); .build();
clusterService.pushNotificationToCore(serviceId, msg, null); clusterService.pushNotificationToCore(serviceId, msg, null);
} }

View File

@ -73,7 +73,7 @@ public class DefaultRuleEngineCallService implements RuleEngineCallService {
UUID requestId = new UUID(restApiCallResponseMsg.getRequestIdMSB(), restApiCallResponseMsg.getRequestIdLSB()); UUID requestId = new UUID(restApiCallResponseMsg.getRequestIdMSB(), restApiCallResponseMsg.getRequestIdLSB());
Consumer<TbMsg> consumer = requests.remove(requestId); Consumer<TbMsg> consumer = requests.remove(requestId);
if (consumer != null) { if (consumer != null) {
consumer.accept(TbMsg.fromBytes(null, restApiCallResponseMsg.getResponse().toByteArray(), TbMsgCallback.EMPTY)); consumer.accept(TbMsg.fromProto(null, restApiCallResponseMsg.getResponseProto(), restApiCallResponseMsg.getResponse(), TbMsgCallback.EMPTY));
} else { } else {
log.trace("[{}] Unknown or stale rest api call response received", requestId); log.trace("[{}] Unknown or stale rest api call response received", requestId);
} }

View File

@ -770,11 +770,11 @@ class DefaultTbContextTest {
ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue();
assertThat(actualToRuleEngineMsg).usingRecursiveComparison() assertThat(actualToRuleEngineMsg).usingRecursiveComparison()
.ignoringFields("tbMsg_") .ignoringFields("tbMsgProto_")
.isEqualTo(ToRuleEngineMsg.newBuilder() .isEqualTo(ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(expectedTbMsg)) .setTbMsgProto(TbMsg.toProto(expectedTbMsg))
.addAllRelationTypes(List.of(connectionType)).build()); .addAllRelationTypes(List.of(connectionType)).build());
var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue(); var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue();
@ -827,11 +827,11 @@ class DefaultTbContextTest {
ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue();
assertThat(actualToRuleEngineMsg).usingRecursiveComparison() assertThat(actualToRuleEngineMsg).usingRecursiveComparison()
.ignoringFields("tbMsg_") .ignoringFields("tbMsgProto_")
.isEqualTo(ToRuleEngineMsg.newBuilder() .isEqualTo(ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(expectedTbMsg)) .setTbMsgProto(TbMsg.toProto(expectedTbMsg))
.build()); .build());
var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue(); var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue();
@ -1009,11 +1009,11 @@ class DefaultTbContextTest {
ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue();
assertThat(actualToRuleEngineMsg).usingRecursiveComparison() assertThat(actualToRuleEngineMsg).usingRecursiveComparison()
.ignoringFields("tbMsg_") .ignoringFields("tbMsgProto_.id_")
.isEqualTo(ToRuleEngineMsg.newBuilder() .isEqualTo(ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(expectedTbMsg)) .setTbMsgProto(TbMsg.toProto(expectedTbMsg))
.setFailureMessage(EXCEPTION_MSG) .setFailureMessage(EXCEPTION_MSG)
.addAllRelationTypes(List.of(TbNodeConnectionType.FAILURE)).build()); .addAllRelationTypes(List.of(TbNodeConnectionType.FAILURE)).build());

View File

@ -762,7 +762,8 @@ public class TenantControllerTest extends AbstractControllerTest {
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)).build(); .setTbMsgProto(TbMsg.toProto(tbMsg))
.build();
tbClusterService.pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, null); tbClusterService.pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, null);
return tbMsg; return tbMsg;
} }

View File

@ -737,7 +737,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
.setTenantIdMSB(tenantId.getMostSignificantBits()) .setTenantIdMSB(tenantId.getMostSignificantBits())
.setTenantIdLSB(tenantId.getLeastSignificantBits()) .setTenantIdLSB(tenantId.getLeastSignificantBits())
.addRelationTypes("Success") .addRelationTypes("Success")
.setTbMsg(TbMsg.toByteString(tbMsg)) .setTbMsgProto(TbMsg.toProto(tbMsg))
.build()); .build());
} }

View File

@ -263,7 +263,7 @@ public class TbRuleEngineStrategyTest {
.setTenantIdMSB(tenantId.getMostSignificantBits()) .setTenantIdMSB(tenantId.getMostSignificantBits())
.setTenantIdLSB(tenantId.getLeastSignificantBits()) .setTenantIdLSB(tenantId.getLeastSignificantBits())
.addRelationTypes("Success") .addRelationTypes("Success")
.setTbMsg(TbMsg.toByteString(tbMsg)) .setTbMsgProto(TbMsg.toProto(tbMsg))
.build()); .build());
} }

View File

@ -55,7 +55,7 @@ class DefaultTbRuleEngineRpcServiceTest {
var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.newBuilder() var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.newBuilder()
.setRequestIdMSB(requestId.getMostSignificantBits()) .setRequestIdMSB(requestId.getMostSignificantBits())
.setRequestIdLSB(requestId.getLeastSignificantBits()) .setRequestIdLSB(requestId.getLeastSignificantBits())
.setResponse(TbMsg.toByteString(msg)) .setResponseProto(TbMsg.toProto(msg))
.build(); .build();
// WHEN // WHEN

View File

@ -144,7 +144,7 @@ public class DefaultRuleEngineCallServiceTest {
private TransportProtos.RestApiCallResponseMsgProto getResponse(UUID requestId, TbMsg msg) { private TransportProtos.RestApiCallResponseMsgProto getResponse(UUID requestId, TbMsg msg) {
return TransportProtos.RestApiCallResponseMsgProto.newBuilder() return TransportProtos.RestApiCallResponseMsgProto.newBuilder()
.setResponse(TbMsg.toByteString(msg)) .setResponseProto(TbMsg.toProto(msg))
.setRequestIdMSB(requestId.getMostSignificantBits()) .setRequestIdMSB(requestId.getMostSignificantBits())
.setRequestIdLSB(requestId.getLeastSignificantBits()) .setRequestIdLSB(requestId.getLeastSignificantBits())
.build(); .build();

View File

@ -151,11 +151,7 @@ public final class TbMsg implements Serializable {
this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY); this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY);
} }
public static ByteString toByteString(TbMsg msg) { public static MsgProtos.TbMsgProto toProto(TbMsg msg) {
return ByteString.copyFrom(toByteArray(msg));
}
public static byte[] toByteArray(TbMsg msg) {
MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder(); MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
builder.setId(msg.getId().toString()); builder.setId(msg.getId().toString());
builder.setTs(msg.getTs()); builder.setTs(msg.getTs());
@ -205,56 +201,61 @@ public final class TbMsg implements Serializable {
} }
builder.setCtx(msg.ctx.toProto()); builder.setCtx(msg.ctx.toProto());
return builder.build().toByteArray(); return builder.build();
} }
public static TbMsg fromBytes(String queueName, byte[] data, TbMsgCallback callback) { //TODO: added for processing old messages from queue, should be removed after release
@Deprecated(forRemoval = true)
public static TbMsg fromProto(String queueName, MsgProtos.TbMsgProto proto, ByteString data, TbMsgCallback callback) {
try { try {
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data); if (!data.isEmpty()) {
TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); proto = MsgProtos.TbMsgProto.parseFrom(data);
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
CustomerId customerId = null;
RuleChainId ruleChainId = null;
RuleNodeId ruleNodeId = null;
UUID correlationId = null;
Integer partition = null;
List<CalculatedFieldId> calculatedFieldIds = new CopyOnWriteArrayList<>();
if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) {
customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB()));
}
if (proto.getRuleChainIdMSB() != 0L && proto.getRuleChainIdLSB() != 0L) {
ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB()));
}
if (proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) {
ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
}
if (proto.getCorrelationIdMSB() != 0L && proto.getCorrelationIdLSB() != 0L) {
correlationId = new UUID(proto.getCorrelationIdMSB(), proto.getCorrelationIdLSB());
partition = proto.getPartition();
} }
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
}
return fromProto(queueName, proto, callback);
}
for (MsgProtos.CalculatedFieldIdProto cfIdProto : proto.getCalculatedFieldsList()) { public static TbMsg fromProto(String queueName, MsgProtos.TbMsgProto proto, TbMsgCallback callback) {
TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
CustomerId customerId = null;
RuleChainId ruleChainId = null;
RuleNodeId ruleNodeId = null;
UUID correlationId = null;
Integer partition = null;
List<CalculatedFieldId> calculatedFieldIds = new CopyOnWriteArrayList<>();if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) {
customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB()));
}
if (proto.getRuleChainIdMSB() != 0L && proto.getRuleChainIdLSB() != 0L) {
ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB()));
}
if (proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) {
ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
}
if (proto.getCorrelationIdMSB() != 0L && proto.getCorrelationIdLSB() != 0L) {
correlationId = new UUID(proto.getCorrelationIdMSB(), proto.getCorrelationIdLSB());
partition = proto.getPartition();
}
for (MsgProtos.CalculatedFieldIdProto cfIdProto : proto.getCalculatedFieldsList()) {
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID( CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(
cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdMSB(),
cfIdProto.getCalculatedFieldIdLSB() cfIdProto.getCalculatedFieldIdLSB()
)); ));
calculatedFieldIds.add(calculatedFieldId); calculatedFieldIds.add(calculatedFieldId);
} }TbMsgProcessingCtx ctx;
if (proto.hasCtx()) {
TbMsgProcessingCtx ctx; ctx = TbMsgProcessingCtx.fromProto(proto.getCtx());
if (proto.hasCtx()) { } else {
ctx = TbMsgProcessingCtx.fromProto(proto.getCtx()); // Backward compatibility with unprocessed messages fetched from queue after update.
} else { ctx = new TbMsgProcessingCtx(proto.getRuleNodeExecCounter());
// Backward compatibility with unprocessed messages fetched from queue after update.
ctx = new TbMsgProcessingCtx(proto.getRuleNodeExecCounter());
}
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId,
metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
} }
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId,
metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback);
} }
public int getAndIncrementRuleNodeCounter() { public int getAndIncrementRuleNodeCounter() {

View File

@ -20,6 +20,8 @@ package transport;
option java_package = "org.thingsboard.server.gen.transport"; option java_package = "org.thingsboard.server.gen.transport";
option java_outer_classname = "TransportProtos"; option java_outer_classname = "TransportProtos";
import "tbmsg.proto";
/** /**
* Common data structures * Common data structures
*/ */
@ -125,7 +127,8 @@ message SessionInfoProto {
message RestApiCallResponseMsgProto { message RestApiCallResponseMsgProto {
int64 requestIdMSB = 1; int64 requestIdMSB = 1;
int64 requestIdLSB = 2; int64 requestIdLSB = 2;
bytes response = 5; bytes response = 5 [deprecated = true];
msgqueue.TbMsgProto responseProto = 6;
} }
enum SessionEvent { enum SessionEvent {
@ -1700,9 +1703,10 @@ message ToCalculatedFieldNotificationMsg {
message ToRuleEngineMsg { message ToRuleEngineMsg {
int64 tenantIdMSB = 1; int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2; int64 tenantIdLSB = 2;
bytes tbMsg = 3; bytes tbMsg = 3 [deprecated = true];
repeated string relationTypes = 4; repeated string relationTypes = 4;
string failureMessage = 5; string failureMessage = 5;
msgqueue.TbMsgProto tbMsgProto = 6;
} }
message ToRuleEngineNotificationMsg { message ToRuleEngineNotificationMsg {

View File

@ -65,7 +65,7 @@ public class TbRuleEngineProducerService {
log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg); log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg);
} }
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder() ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
.setTbMsg(TbMsg.toByteString(tbMsg)) .setTbMsgProto(TbMsg.toProto(tbMsg))
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build(); .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
producer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback); producer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);