Merge pull request #13323 from thingsboard/feature/prod-4738
Used TbMsgProto instead of ByteString
This commit is contained in:
commit
a3f36632be
@ -235,7 +235,8 @@ public class DefaultTbContext implements TbContext {
|
|||||||
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||||
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
|
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
|
||||||
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
|
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
|
||||||
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
|
.setTbMsgProto(TbMsg.toProto(tbMsg))
|
||||||
|
.build();
|
||||||
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback);
|
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -314,7 +315,7 @@ public class DefaultTbContext implements TbContext {
|
|||||||
TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||||
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
|
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
|
||||||
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
|
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
|
||||||
.setTbMsg(TbMsg.toByteString(tbMsg))
|
.setTbMsgProto(TbMsg.toProto(tbMsg))
|
||||||
.addAllRelationTypes(relationTypes);
|
.addAllRelationTypes(relationTypes);
|
||||||
if (failureMessage != null) {
|
if (failureMessage != null) {
|
||||||
msg.setFailureMessage(failureMessage);
|
msg.setFailureMessage(failureMessage);
|
||||||
|
|||||||
@ -381,7 +381,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
|||||||
ToRuleEngineMsg toQueueMsg = ToRuleEngineMsg.newBuilder()
|
ToRuleEngineMsg toQueueMsg = ToRuleEngineMsg.newBuilder()
|
||||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||||
.setTbMsg(TbMsg.toByteString(newMsg))
|
.setTbMsgProto(TbMsg.toProto(newMsg))
|
||||||
.build();
|
.build();
|
||||||
clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper);
|
clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -210,7 +210,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
|
|||||||
TransportProtos.ToRuleEngineMsg toQueueMsg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
TransportProtos.ToRuleEngineMsg toQueueMsg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||||
.setTbMsg(TbMsg.toByteString(tbMsg))
|
.setTbMsgProto(TbMsg.toProto(tbMsg))
|
||||||
.build();
|
.build();
|
||||||
systemContext.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), toQueueMsg, null);
|
systemContext.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), toQueueMsg, null);
|
||||||
defaultCtx.ack(source);
|
defaultCtx.ack(source);
|
||||||
|
|||||||
@ -284,7 +284,8 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
|
|||||||
|
|
||||||
protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
|
protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tbMsg.getOriginator());
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tbMsg.getOriginator());
|
||||||
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg))
|
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||||
|
.setTbMsgProto(TbMsg.toProto(tbMsg))
|
||||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
|
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
|
||||||
ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
|
ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||||
import org.thingsboard.server.common.msg.gen.MsgProtos;
|
import org.thingsboard.server.common.msg.gen.MsgProtos;
|
||||||
|
import org.thingsboard.server.common.util.ProtoUtils;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
@ -34,7 +35,7 @@ public class SequentialByOriginatorIdTbRuleEngineSubmitStrategy extends Sequenti
|
|||||||
@Override
|
@Override
|
||||||
protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) {
|
protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) {
|
||||||
try {
|
try {
|
||||||
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(msg.getTbMsg());
|
MsgProtos.TbMsgProto proto = ProtoUtils.getTbMsgProto(msg);
|
||||||
return EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
return EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
||||||
} catch (InvalidProtocolBufferException e) {
|
} catch (InvalidProtocolBufferException e) {
|
||||||
log.warn("[{}] Failed to parse TbMsg: {}", queueName, msg);
|
log.warn("[{}] Failed to parse TbMsg: {}", queueName, msg);
|
||||||
|
|||||||
@ -19,8 +19,8 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
import org.thingsboard.server.common.data.queue.ProcessingStrategy;
|
import org.thingsboard.server.common.data.queue.ProcessingStrategy;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
|
||||||
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
|
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
|
||||||
|
import org.thingsboard.server.common.util.ProtoUtils;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||||
|
|
||||||
@ -125,7 +125,7 @@ public class TbRuleEngineProcessingStrategyFactory {
|
|||||||
}
|
}
|
||||||
log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
|
log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
|
||||||
if (log.isTraceEnabled()) {
|
if (log.isTraceEnabled()) {
|
||||||
toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
|
toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, ProtoUtils.fromTbMsgProto(result.getQueueName(), msg.getValue(), TbMsgCallback.EMPTY)));
|
||||||
}
|
}
|
||||||
if (pauseBetweenRetries > 0) {
|
if (pauseBetweenRetries > 0) {
|
||||||
try {
|
try {
|
||||||
@ -164,10 +164,10 @@ public class TbRuleEngineProcessingStrategyFactory {
|
|||||||
log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
|
log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
|
||||||
}
|
}
|
||||||
if (log.isTraceEnabled()) {
|
if (log.isTraceEnabled()) {
|
||||||
result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
|
result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, ProtoUtils.fromTbMsgProto(result.getQueueName(), msg.getValue(), TbMsgCallback.EMPTY)));
|
||||||
}
|
}
|
||||||
if (log.isTraceEnabled()) {
|
if (log.isTraceEnabled()) {
|
||||||
result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
|
result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, ProtoUtils.fromTbMsgProto(result.getQueueName(), msg.getValue(), TbMsgCallback.EMPTY)));
|
||||||
}
|
}
|
||||||
return new TbRuleEngineProcessingDecision(true, null);
|
return new TbRuleEngineProcessingDecision(true, null);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -30,6 +30,7 @@ import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
|
|||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
|
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
|
||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
|
import org.thingsboard.server.common.util.ProtoUtils;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||||
@ -178,7 +179,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
|
|||||||
new TbMsgPackCallback(id, tenantId, packCtx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) :
|
new TbMsgPackCallback(id, tenantId, packCtx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) :
|
||||||
new TbMsgPackCallback(id, tenantId, packCtx);
|
new TbMsgPackCallback(id, tenantId, packCtx);
|
||||||
try {
|
try {
|
||||||
if (!toRuleEngineMsg.getTbMsg().isEmpty()) {
|
if (!toRuleEngineMsg.getTbMsg().isEmpty() || toRuleEngineMsg.hasTbMsgProto()) {
|
||||||
forwardToRuleEngineActor(config.getName(), tenantId, toRuleEngineMsg, callback);
|
forwardToRuleEngineActor(config.getName(), tenantId, toRuleEngineMsg, callback);
|
||||||
} else {
|
} else {
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
@ -189,7 +190,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
|
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
|
||||||
TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
|
TbMsg tbMsg = ProtoUtils.fromTbMsgProto(queueName, toRuleEngineMsg, callback);
|
||||||
QueueToRuleEngineMsg msg;
|
QueueToRuleEngineMsg msg;
|
||||||
ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
|
ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
|
||||||
Set<String> relationTypes;
|
Set<String> relationTypes;
|
||||||
@ -207,7 +208,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
|
|||||||
log.info("[{}] {} to process [{}] messages", queueKey, prefix, map.size());
|
log.info("[{}] {} to process [{}] messages", queueKey, prefix, map.size());
|
||||||
for (Map.Entry<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pending : map.entrySet()) {
|
for (Map.Entry<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pending : map.entrySet()) {
|
||||||
ToRuleEngineMsg tmp = pending.getValue().getValue();
|
ToRuleEngineMsg tmp = pending.getValue().getValue();
|
||||||
TbMsg tmpMsg = TbMsg.fromBytes(config.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY);
|
TbMsg tmpMsg = ProtoUtils.fromTbMsgProto(config.getName(), tmp, TbMsgCallback.EMPTY);
|
||||||
RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey());
|
RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey());
|
||||||
if (printAll) {
|
if (printAll) {
|
||||||
log.trace("[{}][{}] {} to process message: {}, Last Rule Node: {}", queueKey, TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo);
|
log.trace("[{}][{}] {} to process message: {}, Last Rule Node: {}", queueKey, TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo);
|
||||||
@ -236,7 +237,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
|
|||||||
}
|
}
|
||||||
for (TbProtoQueueMsg<ToRuleEngineMsg> msg : msgs) {
|
for (TbProtoQueueMsg<ToRuleEngineMsg> msg : msgs) {
|
||||||
try {
|
try {
|
||||||
MsgProtos.TbMsgProto tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg().toByteArray());
|
MsgProtos.TbMsgProto tbMsgProto = ProtoUtils.getTbMsgProto(msg.getValue());
|
||||||
EntityId originator = EntityIdFactory.getByTypeAndUuid(tbMsgProto.getEntityType(), new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB()));
|
EntityId originator = EntityIdFactory.getByTypeAndUuid(tbMsgProto.getEntityType(), new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB()));
|
||||||
|
|
||||||
TopicPartitionInfo tpi = ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, config.getName(), TenantId.SYS_TENANT_ID, originator);
|
TopicPartitionInfo tpi = ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, config.getName(), TenantId.SYS_TENANT_ID, originator);
|
||||||
|
|||||||
@ -133,7 +133,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
|
|||||||
TransportProtos.RestApiCallResponseMsgProto msg = TransportProtos.RestApiCallResponseMsgProto.newBuilder()
|
TransportProtos.RestApiCallResponseMsgProto msg = TransportProtos.RestApiCallResponseMsgProto.newBuilder()
|
||||||
.setRequestIdMSB(requestId.getMostSignificantBits())
|
.setRequestIdMSB(requestId.getMostSignificantBits())
|
||||||
.setRequestIdLSB(requestId.getLeastSignificantBits())
|
.setRequestIdLSB(requestId.getLeastSignificantBits())
|
||||||
.setResponse(TbMsg.toByteString(tbMsg))
|
.setResponseProto(TbMsg.toProto(tbMsg))
|
||||||
.build();
|
.build();
|
||||||
clusterService.pushNotificationToCore(serviceId, msg, null);
|
clusterService.pushNotificationToCore(serviceId, msg, null);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -73,7 +73,7 @@ public class DefaultRuleEngineCallService implements RuleEngineCallService {
|
|||||||
UUID requestId = new UUID(restApiCallResponseMsg.getRequestIdMSB(), restApiCallResponseMsg.getRequestIdLSB());
|
UUID requestId = new UUID(restApiCallResponseMsg.getRequestIdMSB(), restApiCallResponseMsg.getRequestIdLSB());
|
||||||
Consumer<TbMsg> consumer = requests.remove(requestId);
|
Consumer<TbMsg> consumer = requests.remove(requestId);
|
||||||
if (consumer != null) {
|
if (consumer != null) {
|
||||||
consumer.accept(TbMsg.fromBytes(null, restApiCallResponseMsg.getResponse().toByteArray(), TbMsgCallback.EMPTY));
|
consumer.accept(TbMsg.fromProto(null, restApiCallResponseMsg.getResponseProto(), restApiCallResponseMsg.getResponse(), TbMsgCallback.EMPTY));
|
||||||
} else {
|
} else {
|
||||||
log.trace("[{}] Unknown or stale rest api call response received", requestId);
|
log.trace("[{}] Unknown or stale rest api call response received", requestId);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -770,11 +770,11 @@ class DefaultTbContextTest {
|
|||||||
|
|
||||||
ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue();
|
ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue();
|
||||||
assertThat(actualToRuleEngineMsg).usingRecursiveComparison()
|
assertThat(actualToRuleEngineMsg).usingRecursiveComparison()
|
||||||
.ignoringFields("tbMsg_")
|
.ignoringFields("tbMsgProto_")
|
||||||
.isEqualTo(ToRuleEngineMsg.newBuilder()
|
.isEqualTo(ToRuleEngineMsg.newBuilder()
|
||||||
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
|
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
|
||||||
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
|
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
|
||||||
.setTbMsg(TbMsg.toByteString(expectedTbMsg))
|
.setTbMsgProto(TbMsg.toProto(expectedTbMsg))
|
||||||
.addAllRelationTypes(List.of(connectionType)).build());
|
.addAllRelationTypes(List.of(connectionType)).build());
|
||||||
|
|
||||||
var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue();
|
var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue();
|
||||||
@ -827,11 +827,11 @@ class DefaultTbContextTest {
|
|||||||
|
|
||||||
ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue();
|
ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue();
|
||||||
assertThat(actualToRuleEngineMsg).usingRecursiveComparison()
|
assertThat(actualToRuleEngineMsg).usingRecursiveComparison()
|
||||||
.ignoringFields("tbMsg_")
|
.ignoringFields("tbMsgProto_")
|
||||||
.isEqualTo(ToRuleEngineMsg.newBuilder()
|
.isEqualTo(ToRuleEngineMsg.newBuilder()
|
||||||
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
|
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
|
||||||
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
|
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
|
||||||
.setTbMsg(TbMsg.toByteString(expectedTbMsg))
|
.setTbMsgProto(TbMsg.toProto(expectedTbMsg))
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue();
|
var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue();
|
||||||
@ -1009,11 +1009,11 @@ class DefaultTbContextTest {
|
|||||||
|
|
||||||
ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue();
|
ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue();
|
||||||
assertThat(actualToRuleEngineMsg).usingRecursiveComparison()
|
assertThat(actualToRuleEngineMsg).usingRecursiveComparison()
|
||||||
.ignoringFields("tbMsg_")
|
.ignoringFields("tbMsgProto_.id_")
|
||||||
.isEqualTo(ToRuleEngineMsg.newBuilder()
|
.isEqualTo(ToRuleEngineMsg.newBuilder()
|
||||||
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
|
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
|
||||||
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
|
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
|
||||||
.setTbMsg(TbMsg.toByteString(expectedTbMsg))
|
.setTbMsgProto(TbMsg.toProto(expectedTbMsg))
|
||||||
.setFailureMessage(EXCEPTION_MSG)
|
.setFailureMessage(EXCEPTION_MSG)
|
||||||
.addAllRelationTypes(List.of(TbNodeConnectionType.FAILURE)).build());
|
.addAllRelationTypes(List.of(TbNodeConnectionType.FAILURE)).build());
|
||||||
|
|
||||||
|
|||||||
@ -762,7 +762,8 @@ public class TenantControllerTest extends AbstractControllerTest {
|
|||||||
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||||
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
|
.setTbMsgProto(TbMsg.toProto(tbMsg))
|
||||||
|
.build();
|
||||||
tbClusterService.pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, null);
|
tbClusterService.pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, null);
|
||||||
return tbMsg;
|
return tbMsg;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -737,7 +737,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
|
|||||||
.setTenantIdMSB(tenantId.getMostSignificantBits())
|
.setTenantIdMSB(tenantId.getMostSignificantBits())
|
||||||
.setTenantIdLSB(tenantId.getLeastSignificantBits())
|
.setTenantIdLSB(tenantId.getLeastSignificantBits())
|
||||||
.addRelationTypes("Success")
|
.addRelationTypes("Success")
|
||||||
.setTbMsg(TbMsg.toByteString(tbMsg))
|
.setTbMsgProto(TbMsg.toProto(tbMsg))
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -263,7 +263,7 @@ public class TbRuleEngineStrategyTest {
|
|||||||
.setTenantIdMSB(tenantId.getMostSignificantBits())
|
.setTenantIdMSB(tenantId.getMostSignificantBits())
|
||||||
.setTenantIdLSB(tenantId.getLeastSignificantBits())
|
.setTenantIdLSB(tenantId.getLeastSignificantBits())
|
||||||
.addRelationTypes("Success")
|
.addRelationTypes("Success")
|
||||||
.setTbMsg(TbMsg.toByteString(tbMsg))
|
.setTbMsgProto(TbMsg.toProto(tbMsg))
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -55,7 +55,7 @@ class DefaultTbRuleEngineRpcServiceTest {
|
|||||||
var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.newBuilder()
|
var restApiCallResponseMsgProto = TransportProtos.RestApiCallResponseMsgProto.newBuilder()
|
||||||
.setRequestIdMSB(requestId.getMostSignificantBits())
|
.setRequestIdMSB(requestId.getMostSignificantBits())
|
||||||
.setRequestIdLSB(requestId.getLeastSignificantBits())
|
.setRequestIdLSB(requestId.getLeastSignificantBits())
|
||||||
.setResponse(TbMsg.toByteString(msg))
|
.setResponseProto(TbMsg.toProto(msg))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// WHEN
|
// WHEN
|
||||||
|
|||||||
@ -144,7 +144,7 @@ public class DefaultRuleEngineCallServiceTest {
|
|||||||
|
|
||||||
private TransportProtos.RestApiCallResponseMsgProto getResponse(UUID requestId, TbMsg msg) {
|
private TransportProtos.RestApiCallResponseMsgProto getResponse(UUID requestId, TbMsg msg) {
|
||||||
return TransportProtos.RestApiCallResponseMsgProto.newBuilder()
|
return TransportProtos.RestApiCallResponseMsgProto.newBuilder()
|
||||||
.setResponse(TbMsg.toByteString(msg))
|
.setResponseProto(TbMsg.toProto(msg))
|
||||||
.setRequestIdMSB(requestId.getMostSignificantBits())
|
.setRequestIdMSB(requestId.getMostSignificantBits())
|
||||||
.setRequestIdLSB(requestId.getLeastSignificantBits())
|
.setRequestIdLSB(requestId.getLeastSignificantBits())
|
||||||
.build();
|
.build();
|
||||||
|
|||||||
@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.RuleChainId;
|
|||||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||||
import org.thingsboard.server.common.msg.gen.MsgProtos;
|
import org.thingsboard.server.common.msg.gen.MsgProtos;
|
||||||
|
import org.thingsboard.server.common.msg.gen.MsgProtos.TbMsgProto;
|
||||||
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
|
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
@ -151,12 +152,8 @@ public final class TbMsg implements Serializable {
|
|||||||
this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY);
|
this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ByteString toByteString(TbMsg msg) {
|
public static TbMsgProto toProto(TbMsg msg) {
|
||||||
return ByteString.copyFrom(toByteArray(msg));
|
TbMsgProto.Builder builder = TbMsgProto.newBuilder();
|
||||||
}
|
|
||||||
|
|
||||||
public static byte[] toByteArray(TbMsg msg) {
|
|
||||||
MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
|
|
||||||
builder.setId(msg.getId().toString());
|
builder.setId(msg.getId().toString());
|
||||||
builder.setTs(msg.getTs());
|
builder.setTs(msg.getTs());
|
||||||
builder.setType(msg.getType());
|
builder.setType(msg.getType());
|
||||||
@ -205,12 +202,22 @@ public final class TbMsg implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
builder.setCtx(msg.ctx.toProto());
|
builder.setCtx(msg.ctx.toProto());
|
||||||
return builder.build().toByteArray();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TbMsg fromBytes(String queueName, byte[] data, TbMsgCallback callback) {
|
@Deprecated(forRemoval = true, since = "4.1") // to be removed in 4.2
|
||||||
|
public static TbMsg fromProto(String queueName, TbMsgProto proto, ByteString data, TbMsgCallback callback) {
|
||||||
try {
|
try {
|
||||||
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data);
|
if (!data.isEmpty()) {
|
||||||
|
proto = TbMsgProto.parseFrom(data);
|
||||||
|
}
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
|
||||||
|
}
|
||||||
|
return fromProto(queueName, proto, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TbMsg fromProto(String queueName, TbMsgProto proto, TbMsgCallback callback) {
|
||||||
TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
|
TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
|
||||||
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
||||||
CustomerId customerId = null;
|
CustomerId customerId = null;
|
||||||
@ -240,21 +247,10 @@ public final class TbMsg implements Serializable {
|
|||||||
));
|
));
|
||||||
calculatedFieldIds.add(calculatedFieldId);
|
calculatedFieldIds.add(calculatedFieldId);
|
||||||
}
|
}
|
||||||
|
TbMsgProcessingCtx ctx = TbMsgProcessingCtx.fromProto(proto.getCtx());
|
||||||
TbMsgProcessingCtx ctx;
|
|
||||||
if (proto.hasCtx()) {
|
|
||||||
ctx = TbMsgProcessingCtx.fromProto(proto.getCtx());
|
|
||||||
} else {
|
|
||||||
// Backward compatibility with unprocessed messages fetched from queue after update.
|
|
||||||
ctx = new TbMsgProcessingCtx(proto.getRuleNodeExecCounter());
|
|
||||||
}
|
|
||||||
|
|
||||||
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
|
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
|
||||||
return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId,
|
return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId,
|
||||||
metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback);
|
metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback);
|
||||||
} catch (InvalidProtocolBufferException e) {
|
|
||||||
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getAndIncrementRuleNodeCounter() {
|
public int getAndIncrementRuleNodeCounter() {
|
||||||
|
|||||||
@ -59,8 +59,8 @@ message TbMsgProto {
|
|||||||
string data = 14;
|
string data = 14;
|
||||||
|
|
||||||
int64 ts = 15;
|
int64 ts = 15;
|
||||||
// Will be removed in 3.4. Moved to processing context
|
|
||||||
int32 ruleNodeExecCounter = 16;
|
// ruleNodeExecCounter (16) was removed in 4.1
|
||||||
|
|
||||||
int64 customerIdMSB = 17;
|
int64 customerIdMSB = 17;
|
||||||
int64 customerIdLSB = 18;
|
int64 customerIdLSB = 18;
|
||||||
|
|||||||
@ -18,6 +18,8 @@ package org.thingsboard.server.common.util;
|
|||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||||
@ -76,12 +78,15 @@ import org.thingsboard.server.common.data.security.DeviceCredentials;
|
|||||||
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
||||||
import org.thingsboard.server.common.data.sync.vc.RepositoryAuthMethod;
|
import org.thingsboard.server.common.data.sync.vc.RepositoryAuthMethod;
|
||||||
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
|
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
|
||||||
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
|
import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
|
||||||
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
|
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
|
||||||
import org.thingsboard.server.common.msg.edge.EdgeHighPriorityMsg;
|
import org.thingsboard.server.common.msg.edge.EdgeHighPriorityMsg;
|
||||||
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
|
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
|
||||||
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
|
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
|
||||||
|
import org.thingsboard.server.common.msg.gen.MsgProtos;
|
||||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||||
|
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
|
||||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
|
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
|
||||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg;
|
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg;
|
||||||
import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg;
|
import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg;
|
||||||
@ -93,8 +98,8 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceDeleteMsg;
|
|||||||
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
|
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
|
||||||
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
|
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
|
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.ApiUsageRecordKeyProto;
|
import org.thingsboard.server.gen.transport.TransportProtos.ApiUsageRecordKeyProto;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -1344,6 +1349,21 @@ public class ProtoUtils {
|
|||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated(forRemoval = true, since = "4.1")
|
||||||
|
public static MsgProtos.TbMsgProto getTbMsgProto(TransportProtos.ToRuleEngineMsg ruleEngineMsg) throws InvalidProtocolBufferException {
|
||||||
|
if (ruleEngineMsg.getTbMsg().isEmpty()) {
|
||||||
|
return ruleEngineMsg.getTbMsgProto();
|
||||||
|
} else {
|
||||||
|
return MsgProtos.TbMsgProto.parseFrom(ruleEngineMsg.getTbMsg());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
@Deprecated(forRemoval = true, since = "4.1") // inline to TbMsg.fromProto(queueName, ruleEngineMsg.getTbMsgProto(), callback)
|
||||||
|
public static TbMsg fromTbMsgProto(String queueName, TransportProtos.ToRuleEngineMsg ruleEngineMsg, TbMsgCallback callback) {
|
||||||
|
return TbMsg.fromProto(queueName, getTbMsgProto(ruleEngineMsg), callback);
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean isNotNull(Object obj) {
|
private static boolean isNotNull(Object obj) {
|
||||||
return obj != null;
|
return obj != null;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,6 +20,8 @@ package transport;
|
|||||||
option java_package = "org.thingsboard.server.gen.transport";
|
option java_package = "org.thingsboard.server.gen.transport";
|
||||||
option java_outer_classname = "TransportProtos";
|
option java_outer_classname = "TransportProtos";
|
||||||
|
|
||||||
|
import "tbmsg.proto";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Common data structures
|
* Common data structures
|
||||||
*/
|
*/
|
||||||
@ -125,7 +127,8 @@ message SessionInfoProto {
|
|||||||
message RestApiCallResponseMsgProto {
|
message RestApiCallResponseMsgProto {
|
||||||
int64 requestIdMSB = 1;
|
int64 requestIdMSB = 1;
|
||||||
int64 requestIdLSB = 2;
|
int64 requestIdLSB = 2;
|
||||||
bytes response = 5;
|
bytes response = 5 [deprecated = true];
|
||||||
|
msgqueue.TbMsgProto responseProto = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum SessionEvent {
|
enum SessionEvent {
|
||||||
@ -1700,9 +1703,10 @@ message ToCalculatedFieldNotificationMsg {
|
|||||||
message ToRuleEngineMsg {
|
message ToRuleEngineMsg {
|
||||||
int64 tenantIdMSB = 1;
|
int64 tenantIdMSB = 1;
|
||||||
int64 tenantIdLSB = 2;
|
int64 tenantIdLSB = 2;
|
||||||
bytes tbMsg = 3;
|
bytes tbMsg = 3 [deprecated = true]; // for removal in 4.2
|
||||||
repeated string relationTypes = 4;
|
repeated string relationTypes = 4;
|
||||||
string failureMessage = 5;
|
string failureMessage = 5;
|
||||||
|
msgqueue.TbMsgProto tbMsgProto = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ToRuleEngineNotificationMsg {
|
message ToRuleEngineNotificationMsg {
|
||||||
|
|||||||
@ -65,7 +65,7 @@ public class TbRuleEngineProducerService {
|
|||||||
log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg);
|
log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg);
|
||||||
}
|
}
|
||||||
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
|
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
|
||||||
.setTbMsg(TbMsg.toByteString(tbMsg))
|
.setTbMsgProto(TbMsg.toProto(tbMsg))
|
||||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
|
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
|
||||||
producer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
|
producer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user