diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 7241c0e571..d5aa187603 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -23,8 +23,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.Device; @@ -57,6 +55,8 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -81,6 +81,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import static org.thingsboard.server.service.queue.ProtoUtils.toProto; + @Service @Slf4j @RequiredArgsConstructor @@ -132,8 +134,7 @@ public class DefaultTbClusterService implements TbClusterService { public void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, msg.getTenantId(), msg.getDeviceId()); log.trace("PUSHING msg: {} to:{}", msg, tpi); - TransportProtos.ToDeviceActorNotificationMsgProto proto = ProtoUtils.toProto(msg); - ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotification(proto).build(); + ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotification(toProto(msg)).build(); producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), callback); toCoreMsgs.incrementAndGet(); } @@ -362,24 +363,22 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) { log.trace("[{}] Processing edge {} event update ", tenantId, edgeId); - TransportProtos.EdgeEventUpdateMsgProto edgeEventUpdateMsgProto = ProtoUtils.toProto(new EdgeEventUpdateMsg(tenantId, edgeId)); - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdate(edgeEventUpdateMsgProto).build(); + EdgeEventUpdateMsg msg = new EdgeEventUpdateMsg(tenantId, edgeId); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdate(toProto(msg)).build(); pushEdgeSyncMsgToCore(edgeId, toCoreMsg); } @Override public void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest) { log.trace("[{}] Processing edge sync request {} ", toEdgeSyncRequest.getTenantId(), toEdgeSyncRequest); - TransportProtos.ToEdgeSyncRequestMsgProto toEdgeSyncRequestMsgProto = ProtoUtils.toProto(toEdgeSyncRequest); - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToEdgeSyncRequest(toEdgeSyncRequestMsgProto).build(); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToEdgeSyncRequest(toProto(toEdgeSyncRequest)).build(); pushEdgeSyncMsgToCore(toEdgeSyncRequest.getEdgeId(), toCoreMsg); } @Override public void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse) { log.trace("[{}] Processing edge sync response {}", fromEdgeSyncResponse.getTenantId(), fromEdgeSyncResponse); - TransportProtos.FromEdgeSyncResponseMsgProto fromEdgeSyncResponseMsgProto = ProtoUtils.toProto(fromEdgeSyncResponse); - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setFromEdgeSyncResponse(fromEdgeSyncResponseMsgProto).build(); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setFromEdgeSyncResponse(toProto(fromEdgeSyncResponse)).build(); pushEdgeSyncMsgToCore(fromEdgeSyncResponse.getEdgeId(), toCoreMsg); } @@ -394,7 +393,7 @@ public class DefaultTbClusterService implements TbClusterService { } private void broadcast(ComponentLifecycleMsg msg) { - TransportProtos.ComponentLifecycleMsgProto componentLifecycleMsgProto = ProtoUtils.toProto(msg); + TransportProtos.ComponentLifecycleMsgProto componentLifecycleMsgProto = toProto(msg); TbQueueProducer> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer(); Set tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE); EntityType entityType = msg.getEntityId().getEntityType(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 56038214d0..57be0c8de4 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -664,7 +664,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg, TbCallback callback) { - actorMsg.ifPresent(tbActorMsg -> forwardToAppActor(id, tbActorMsg)); + if (actorMsg.isPresent()) { + forwardToAppActor(id, actorMsg.get()); + } callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java b/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java index 539f26ef03..76a8e3e89a 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java @@ -15,10 +15,6 @@ */ package org.thingsboard.server.service.queue; -import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; @@ -44,11 +40,15 @@ import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; -import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg; import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg; +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.gen.transport.TransportProtos; import java.util.ArrayList; import java.util.Arrays; @@ -416,7 +416,7 @@ public class ProtoUtils { } else if (proto.hasDeviceNameOrTypeMsg()) { return fromProto(proto.getDeviceNameOrTypeMsg()); } else if (proto.hasDeviceAttributesEventMsg()) { - return fromProto(proto.getDeviceAttributesEventMsg()); + return fromProto(proto.getDeviceAttributesEventMsg()); } else if (proto.hasDeviceCredentialsUpdateMsg()) { return fromProto(proto.getDeviceCredentialsUpdateMsg()); } else if (proto.hasToDeviceRpcRequestMsg()) { diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index f79cf0b731..480a7f3ba4 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -1152,7 +1152,7 @@ message ToCoreMsg { message ToCoreNotificationMsg { LocalSubscriptionServiceMsgProto toLocalSubscriptionServiceMsg = 1; FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; - bytes componentLifecycleMsg = 3; //will be removed in 3.6.1 in favour of ComponentLifecycleMsgProto + bytes componentLifecycleMsg = 3 [deprecated = true]; bytes edgeEventUpdateMsg = 4 [deprecated = true]; QueueUpdateMsg queueUpdateMsg = 5; QueueDeleteMsg queueDeleteMsg = 6; @@ -1177,7 +1177,7 @@ message ToRuleEngineMsg { } message ToRuleEngineNotificationMsg { - bytes componentLifecycleMsg = 1; // will be removed in 3.6.1 in favour of ComponentLifecycleMsgProto + bytes componentLifecycleMsg = 1 [deprecated = true]; FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; QueueUpdateMsg queueUpdateMsg = 3; QueueDeleteMsg queueDeleteMsg = 4;