Make componentLifecycle bytes deprecated. Cleaning code with a single line for TbClusterService

This commit is contained in:
Andrii Landiak 2023-10-31 09:58:41 +02:00
parent dc6956cb3d
commit ddfac584a0
4 changed files with 22 additions and 21 deletions

View File

@ -23,8 +23,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
@ -57,6 +55,8 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -81,6 +81,8 @@ import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.thingsboard.server.service.queue.ProtoUtils.toProto;
@Service @Service
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
@ -132,8 +134,7 @@ public class DefaultTbClusterService implements TbClusterService {
public void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback) { public void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, msg.getTenantId(), msg.getDeviceId()); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, msg.getTenantId(), msg.getDeviceId());
log.trace("PUSHING msg: {} to:{}", msg, tpi); log.trace("PUSHING msg: {} to:{}", msg, tpi);
TransportProtos.ToDeviceActorNotificationMsgProto proto = ProtoUtils.toProto(msg); ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotification(toProto(msg)).build();
ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotification(proto).build();
producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), callback); producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), callback);
toCoreMsgs.incrementAndGet(); toCoreMsgs.incrementAndGet();
} }
@ -362,24 +363,22 @@ public class DefaultTbClusterService implements TbClusterService {
@Override @Override
public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) { public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) {
log.trace("[{}] Processing edge {} event update ", tenantId, edgeId); log.trace("[{}] Processing edge {} event update ", tenantId, edgeId);
TransportProtos.EdgeEventUpdateMsgProto edgeEventUpdateMsgProto = ProtoUtils.toProto(new EdgeEventUpdateMsg(tenantId, edgeId)); EdgeEventUpdateMsg msg = new EdgeEventUpdateMsg(tenantId, edgeId);
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdate(edgeEventUpdateMsgProto).build(); ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdate(toProto(msg)).build();
pushEdgeSyncMsgToCore(edgeId, toCoreMsg); pushEdgeSyncMsgToCore(edgeId, toCoreMsg);
} }
@Override @Override
public void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest) { public void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest) {
log.trace("[{}] Processing edge sync request {} ", toEdgeSyncRequest.getTenantId(), toEdgeSyncRequest); log.trace("[{}] Processing edge sync request {} ", toEdgeSyncRequest.getTenantId(), toEdgeSyncRequest);
TransportProtos.ToEdgeSyncRequestMsgProto toEdgeSyncRequestMsgProto = ProtoUtils.toProto(toEdgeSyncRequest); ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToEdgeSyncRequest(toProto(toEdgeSyncRequest)).build();
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToEdgeSyncRequest(toEdgeSyncRequestMsgProto).build();
pushEdgeSyncMsgToCore(toEdgeSyncRequest.getEdgeId(), toCoreMsg); pushEdgeSyncMsgToCore(toEdgeSyncRequest.getEdgeId(), toCoreMsg);
} }
@Override @Override
public void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse) { public void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse) {
log.trace("[{}] Processing edge sync response {}", fromEdgeSyncResponse.getTenantId(), fromEdgeSyncResponse); log.trace("[{}] Processing edge sync response {}", fromEdgeSyncResponse.getTenantId(), fromEdgeSyncResponse);
TransportProtos.FromEdgeSyncResponseMsgProto fromEdgeSyncResponseMsgProto = ProtoUtils.toProto(fromEdgeSyncResponse); ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setFromEdgeSyncResponse(toProto(fromEdgeSyncResponse)).build();
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setFromEdgeSyncResponse(fromEdgeSyncResponseMsgProto).build();
pushEdgeSyncMsgToCore(fromEdgeSyncResponse.getEdgeId(), toCoreMsg); pushEdgeSyncMsgToCore(fromEdgeSyncResponse.getEdgeId(), toCoreMsg);
} }
@ -394,7 +393,7 @@ public class DefaultTbClusterService implements TbClusterService {
} }
private void broadcast(ComponentLifecycleMsg msg) { private void broadcast(ComponentLifecycleMsg msg) {
TransportProtos.ComponentLifecycleMsgProto componentLifecycleMsgProto = ProtoUtils.toProto(msg); TransportProtos.ComponentLifecycleMsgProto componentLifecycleMsgProto = toProto(msg);
TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer(); TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer();
Set<String> tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE); Set<String> tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE);
EntityType entityType = msg.getEntityId().getEntityType(); EntityType entityType = msg.getEntityId().getEntityType();

View File

@ -664,7 +664,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} }
private void forwardToAppActor(UUID id, Optional<TbActorMsg> actorMsg, TbCallback callback) { private void forwardToAppActor(UUID id, Optional<TbActorMsg> actorMsg, TbCallback callback) {
actorMsg.ifPresent(tbActorMsg -> forwardToAppActor(id, tbActorMsg)); if (actorMsg.isPresent()) {
forwardToAppActor(id, actorMsg.get());
}
callback.onSuccess(); callback.onSuccess();
} }

View File

@ -15,10 +15,6 @@
*/ */
package org.thingsboard.server.service.queue; package org.thingsboard.server.service.queue;
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EdgeId;
@ -44,11 +40,15 @@ import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg;
import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg; import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -416,7 +416,7 @@ public class ProtoUtils {
} else if (proto.hasDeviceNameOrTypeMsg()) { } else if (proto.hasDeviceNameOrTypeMsg()) {
return fromProto(proto.getDeviceNameOrTypeMsg()); return fromProto(proto.getDeviceNameOrTypeMsg());
} else if (proto.hasDeviceAttributesEventMsg()) { } else if (proto.hasDeviceAttributesEventMsg()) {
return fromProto(proto.getDeviceAttributesEventMsg()); return fromProto(proto.getDeviceAttributesEventMsg());
} else if (proto.hasDeviceCredentialsUpdateMsg()) { } else if (proto.hasDeviceCredentialsUpdateMsg()) {
return fromProto(proto.getDeviceCredentialsUpdateMsg()); return fromProto(proto.getDeviceCredentialsUpdateMsg());
} else if (proto.hasToDeviceRpcRequestMsg()) { } else if (proto.hasToDeviceRpcRequestMsg()) {

View File

@ -1152,7 +1152,7 @@ message ToCoreMsg {
message ToCoreNotificationMsg { message ToCoreNotificationMsg {
LocalSubscriptionServiceMsgProto toLocalSubscriptionServiceMsg = 1; LocalSubscriptionServiceMsgProto toLocalSubscriptionServiceMsg = 1;
FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; FromDeviceRPCResponseProto fromDeviceRpcResponse = 2;
bytes componentLifecycleMsg = 3; //will be removed in 3.6.1 in favour of ComponentLifecycleMsgProto bytes componentLifecycleMsg = 3 [deprecated = true];
bytes edgeEventUpdateMsg = 4 [deprecated = true]; bytes edgeEventUpdateMsg = 4 [deprecated = true];
QueueUpdateMsg queueUpdateMsg = 5; QueueUpdateMsg queueUpdateMsg = 5;
QueueDeleteMsg queueDeleteMsg = 6; QueueDeleteMsg queueDeleteMsg = 6;
@ -1177,7 +1177,7 @@ message ToRuleEngineMsg {
} }
message ToRuleEngineNotificationMsg { message ToRuleEngineNotificationMsg {
bytes componentLifecycleMsg = 1; // will be removed in 3.6.1 in favour of ComponentLifecycleMsgProto bytes componentLifecycleMsg = 1 [deprecated = true];
FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; FromDeviceRPCResponseProto fromDeviceRpcResponse = 2;
QueueUpdateMsg queueUpdateMsg = 3; QueueUpdateMsg queueUpdateMsg = 3;
QueueDeleteMsg queueDeleteMsg = 4; QueueDeleteMsg queueDeleteMsg = 4;