diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 50fa7e2f8d..904547a2b6 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -116,7 +116,6 @@ public class AppActor extends ContextAwareActor { case CF_INIT_MSG: case CF_LINK_INIT_MSG: case CF_STATE_RESTORE_MSG: - case CF_ENTITY_LIFECYCLE_MSG: //TODO: use priority from the message body. For example, messages about CF lifecycle are important and Device lifecycle are not. // same for the Linked telemetry. onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, true); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 2d182510e9..fc48e9ad3e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -196,7 +196,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } private void onEntityUpdated(ComponentLifecycleMsg msg, TbCallback callback) { - if (msg.getOldProfileId() != null && msg.getOldProfileId() != msg.getProfileId()) { + if (msg.getOldProfileId() != null && !msg.getOldProfileId().equals(msg.getProfileId())) { cfEntityCache.update(tenantId, msg.getOldProfileId(), msg.getProfileId(), msg.getEntityId()); if (!isMyPartition(msg.getEntityId(), callback)) { return; diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index c1a0980623..846bde508d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -49,6 +49,7 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; @@ -175,7 +176,6 @@ public class TenantActor extends RuleChainManagerActor { case CF_LINK_INIT_MSG: case CF_STATE_RESTORE_MSG: case CF_PARTITIONS_CHANGE_MSG: - case CF_ENTITY_LIFECYCLE_MSG: onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, true); break; case CF_TELEMETRY_MSG: @@ -315,19 +315,26 @@ public class TenantActor extends RuleChainManagerActor { onToDeviceActorMsg(new DeviceDeleteMsg(tenantId, deviceId), true); deletedDevices.add(deviceId); } - if (isRuleEngine && ruleChainsInitialized) { - TbActorRef target = getEntityActorRef(msg.getEntityId()); - if (target != null) { - if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) { - RuleChain ruleChain = systemContext.getRuleChainService(). - findRuleChainById(tenantId, new RuleChainId(msg.getEntityId().getId())); - if (ruleChain != null && RuleChainType.CORE.equals(ruleChain.getType())) { - visit(ruleChain, target); + if (isRuleEngine) { + if (ruleChainsInitialized) { + TbActorRef target = getEntityActorRef(msg.getEntityId()); + if (target != null) { + if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) { + RuleChain ruleChain = systemContext.getRuleChainService(). + findRuleChainById(tenantId, new RuleChainId(msg.getEntityId().getId())); + if (ruleChain != null && RuleChainType.CORE.equals(ruleChain.getType())) { + visit(ruleChain, target); + } } + target.tellWithHighPriority(msg); + } else { + log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg); + } + } + if (cfActor != null) { + if (msg.getEntityId().getEntityType().isOneOf(EntityType.CALCULATED_FIELD, EntityType.DEVICE, EntityType.ASSET)) { + cfActor.tellWithHighPriority(new CalculatedFieldEntityLifecycleMsg(tenantId, msg)); } - target.tellWithHighPriority(msg); - } else { - log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg); } } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 3232a9b89f..04aab6b39d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -28,15 +28,12 @@ import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.QueueConfig; -import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; -import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.queue.TbQueueConsumer; @@ -65,8 +62,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.thingsboard.server.common.util.ProtoUtils.fromProto; - @Service @TbRuleEngineComponent @Slf4j @@ -164,9 +159,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer forwardToActorSystem(toCfMsg.getTelemetryMsg(), callback); } else if (toCfMsg.hasLinkedTelemetryMsg()) { forwardToActorSystem(toCfMsg.getLinkedTelemetryMsg(), callback); - } else if (toCfMsg.hasComponentLifecycleMsg()) { - log.trace("[{}] Forwarding component lifecycle message for processing {}", id, toCfMsg.getComponentLifecycleMsg()); - forwardToActorSystem(toCfMsg.getComponentLifecycleMsg(), callback); } } catch (Throwable e) { log.warn("[{}] Failed to process message: {}", id, msg, e); @@ -215,11 +207,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer @Override protected void handleNotification(UUID id, TbProtoQueueMsg msg, TbCallback callback) { ToCalculatedFieldNotificationMsg toCfNotification = msg.getValue(); - if (toCfNotification.hasComponentLifecycleMsg()) { - handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCfNotification.getComponentLifecycleMsg())); - log.trace("[{}] Forwarding component lifecycle message for processing {}", id, toCfNotification.getComponentLifecycleMsg()); - forwardToActorSystem(toCfNotification.getComponentLifecycleMsg(), callback); - } else if (toCfNotification.hasLinkedTelemetryMsg()) { + if (toCfNotification.hasLinkedTelemetryMsg()) { forwardToActorSystem(toCfNotification.getLinkedTelemetryMsg(), callback); } } @@ -237,11 +225,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer actorContext.tell(new CalculatedFieldLinkedTelemetryMsg(tenantId, entityId, linkedMsg, callback)); } - private void forwardToActorSystem(ComponentLifecycleMsgProto proto, TbCallback callback) { - var msg = fromProto(proto); - actorContext.tell(new CalculatedFieldEntityLifecycleMsg(msg.getTenantId(), msg, callback)); - } - private TenantId toTenantId(long tenantIdMSB, long tenantIdLSB) { return TenantId.fromUUID(new UUID(tenantIdMSB, tenantIdLSB)); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index c7174469b0..92c48a5fed 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -94,10 +94,8 @@ import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbRuleEngineProducerService; import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; -import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; @@ -146,10 +144,6 @@ public class DefaultTbClusterService implements TbClusterService { @Lazy private OtaPackageStateService otaPackageStateService; - @Autowired - @Lazy - private CalculatedFieldProcessingService calculatedFieldProcessingService; - private final TopicService topicService; private final TbDeviceProfileCache deviceProfileCache; private final TbAssetProfileCache assetProfileCache; @@ -369,13 +363,6 @@ public class DefaultTbClusterService implements TbClusterService { toRuleEngineMsgs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS } - @Override - public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); - producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback); - toRuleEngineNfs.incrementAndGet(); - } - @Override public void broadcastEntityStateChangeEvent(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state) { log.trace("[{}] Processing {} state change event: {}", tenantId, entityId.getEntityType(), state); @@ -431,7 +418,6 @@ public class DefaultTbClusterService implements TbClusterService { public void onDeviceDeleted(TenantId tenantId, Device device, TbQueueCallback callback) { DeviceId deviceId = device.getId(); gatewayNotificationsService.onDeviceDeleted(device); - handleCalculatedFieldEntityDeleted(tenantId, deviceId); broadcastEntityDeleteToTransport(tenantId, deviceId, device.getName(), callback); sendDeviceStateServiceEvent(tenantId, deviceId, false, false, true); broadcastEntityStateChangeEvent(tenantId, deviceId, ComponentLifecycleEvent.DELETED); @@ -440,7 +426,6 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onAssetDeleted(TenantId tenantId, Asset asset, TbQueueCallback callback) { AssetId assetId = asset.getId(); - handleCalculatedFieldEntityDeleted(tenantId, assetId); broadcastEntityStateChangeEvent(tenantId, assetId, ComponentLifecycleEvent.DELETED); } @@ -604,6 +589,7 @@ public class DefaultTbClusterService implements TbClusterService { || (entityType.equals(EntityType.DEVICE) && msg.getEvent() == ComponentLifecycleEvent.UPDATED) || entityType.equals(EntityType.ENTITY_VIEW) || entityType.equals(EntityType.NOTIFICATION_RULE) + || entityType.equals(EntityType.CALCULATED_FIELD) ) { TbQueueProducer> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer(); Set tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE); @@ -658,38 +644,28 @@ public class DefaultTbClusterService implements TbClusterService { public void onDeviceUpdated(Device entity, Device old) { var created = old == null; broadcastEntityChangeToTransport(entity.getTenantId(), entity.getId(), entity, null); - if (old != null) { + + var msg = ComponentLifecycleMsg.builder() + .tenantId(entity.getTenantId()) + .entityId(entity.getId()) + .profileId(entity.getDeviceProfileId()) + .name(entity.getName()); + if (created) { + msg.event(ComponentLifecycleEvent.CREATED); + } else { boolean deviceNameChanged = !entity.getName().equals(old.getName()); if (deviceNameChanged) { gatewayNotificationsService.onDeviceUpdated(entity, old); } boolean deviceProfileChanged = !entity.getDeviceProfileId().equals(old.getDeviceProfileId()); - if (deviceProfileChanged) { - ComponentLifecycleMsg msg = ComponentLifecycleMsg.builder() - .tenantId(entity.getTenantId()) - .entityId(entity.getId()) - .event(ComponentLifecycleEvent.UPDATED) - .oldProfileId(old.getDeviceProfileId()) - .profileId(entity.getDeviceProfileId()) - .oldName(old.getName()) - .name(entity.getName()) - .build(); - broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), TbQueueCallback.EMPTY); - } if (deviceNameChanged || deviceProfileChanged) { pushMsgToCore(new DeviceNameOrTypeUpdateMsg(entity.getTenantId(), entity.getId(), entity.getName(), entity.getType()), null); } - } else { - ComponentLifecycleMsg msg = ComponentLifecycleMsg.builder() - .tenantId(entity.getTenantId()) - .entityId(entity.getId()) - .event(ComponentLifecycleEvent.CREATED) - .profileId(entity.getDeviceProfileId()) - .name(entity.getName()) - .build(); - broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), TbQueueCallback.EMPTY); + msg.event(ComponentLifecycleEvent.UPDATED) + .oldProfileId(old.getDeviceProfileId()) + .oldName(old.getName()); } - broadcastEntityStateChangeEvent(entity.getTenantId(), entity.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); + broadcast(msg.build()); sendDeviceStateServiceEvent(entity.getTenantId(), entity.getId(), created, !created, false); otaPackageStateService.update(entity, old); } @@ -697,48 +673,29 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onAssetUpdated(Asset entity, Asset old) { var created = old == null; - if (old != null) { - boolean assetTypeChanged = !entity.getAssetProfileId().equals(old.getAssetProfileId()); - if (assetTypeChanged) { - ComponentLifecycleMsg msg = ComponentLifecycleMsg.builder() - .tenantId(entity.getTenantId()) - .entityId(entity.getId()) - .event(ComponentLifecycleEvent.UPDATED) - .oldProfileId(old.getAssetProfileId()) - .profileId(entity.getAssetProfileId()) - .oldName(old.getName()) - .name(entity.getName()) - .build(); - broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), TbQueueCallback.EMPTY); - } + var msg = ComponentLifecycleMsg.builder() + .tenantId(entity.getTenantId()) + .entityId(entity.getId()) + .profileId(entity.getAssetProfileId()) + .name(entity.getName()); + if (created) { + msg.event(ComponentLifecycleEvent.CREATED); } else { - ComponentLifecycleMsg msg = ComponentLifecycleMsg.builder() - .tenantId(entity.getTenantId()) - .entityId(entity.getId()) - .event(ComponentLifecycleEvent.CREATED) - .profileId(entity.getAssetProfileId()) - .name(entity.getName()) - .build(); - broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), TbQueueCallback.EMPTY); + msg.event(ComponentLifecycleEvent.UPDATED) + .oldProfileId(old.getAssetProfileId()) + .oldName(old.getName()); } - broadcastEntityStateChangeEvent(entity.getTenantId(), entity.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); + broadcast(msg.build()); } @Override public void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField, TbQueueCallback callback) { - var msg = toProto(new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), oldCalculatedField == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED)); - onCalculatedFieldLifecycleMsg(msg, callback); + broadcastEntityStateChangeEvent(calculatedField.getTenantId(), calculatedField.getId(), oldCalculatedField == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); } @Override public void onCalculatedFieldDeleted(CalculatedField calculatedField, TbQueueCallback callback) { - var msg = toProto(new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), ComponentLifecycleEvent.DELETED)); - onCalculatedFieldLifecycleMsg(msg, callback); - } - - private void onCalculatedFieldLifecycleMsg(ComponentLifecycleMsgProto msg, TbQueueCallback callback) { - broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(msg).build(), callback); - broadcastToCore(ToCoreNotificationMsg.newBuilder().setComponentLifecycle(msg).build()); + broadcastEntityStateChangeEvent(calculatedField.getTenantId(), calculatedField.getId(), ComponentLifecycleEvent.DELETED); } @Override @@ -868,8 +825,4 @@ public class DefaultTbClusterService implements TbClusterService { } } - private void handleCalculatedFieldEntityDeleted(TenantId tenantId, EntityId entityId) { - ComponentLifecycleMsg msg = new ComponentLifecycleMsg(tenantId, entityId, ComponentLifecycleEvent.DELETED); - broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), TbQueueCallback.EMPTY); - } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index d522f11f7b..9cc743e510 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -29,7 +29,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.rpc.RpcError; -import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -234,7 +233,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< if (event.getEvent() == ComponentLifecycleEvent.DELETED) { List toRemove = consumers.keySet().stream() .filter(queueKey -> queueKey.getTenantId().equals(event.getTenantId())) - .collect(Collectors.toList()); + .toList(); toRemove.forEach(queueKey -> { removeConsumer(queueKey).ifPresent(consumer -> consumer.delete(false)); }); diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index 2a2d04e0fd..aed6eb4cf5 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -85,8 +85,6 @@ public interface TbClusterService extends TbQueueClusterService { void pushMsgToCalculatedFields(TopicPartitionInfo tpi, UUID msgId, ToCalculatedFieldMsg msg, TbQueueCallback callback); - void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, TransportProtos.ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback); - void broadcastEntityStateChangeEvent(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state); void onDeviceProfileChange(DeviceProfile deviceProfile, DeviceProfile oldDeviceProfile, TbQueueCallback callback); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java b/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java index 4ea66be1b4..1c56aa25dd 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java @@ -86,4 +86,15 @@ public enum EntityType { this.tableName = tableName; } + public boolean isOneOf(EntityType... types) { + if (types == null) { + return false; + } + for (EntityType type : types) { + if (this == type) { + return true; + } + } + return false; + } } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldEntityLifecycleMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldEntityLifecycleMsg.java index 099240f54d..1eb8f425ab 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldEntityLifecycleMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldEntityLifecycleMsg.java @@ -16,19 +16,16 @@ package org.thingsboard.server.common.msg.cf; import lombok.Data; -import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; -import org.thingsboard.server.common.msg.queue.TbCallback; @Data public class CalculatedFieldEntityLifecycleMsg implements ToCalculatedFieldSystemMsg { private final TenantId tenantId; private final ComponentLifecycleMsg data; - private final TbCallback callback; @Override public MsgType getMsgType() { diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 6ecdf13981..1b2edcc96d 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -1674,14 +1674,12 @@ message ToEdgeEventNotificationMsg { } message ToCalculatedFieldMsg { - ComponentLifecycleMsgProto componentLifecycleMsg = 1; - CalculatedFieldTelemetryMsgProto telemetryMsg = 2; - CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsg = 3; + CalculatedFieldTelemetryMsgProto telemetryMsg = 1; + CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsg = 2; } message ToCalculatedFieldNotificationMsg { - ComponentLifecycleMsgProto componentLifecycleMsg = 1; - CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsg = 2; + CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsg = 1; } /* Messages that are handled by ThingsBoard RuleEngine Service */ diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java index f165f60be7..32537edba5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java @@ -24,7 +24,6 @@ import org.thingsboard.server.queue.discovery.QueueKey; import java.io.Serial; import java.util.Collection; -import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.stream.Collectors;