Refactoring and fixes for CF lifecycle events handling

This commit is contained in:
ViacheslavKlimov 2025-03-13 17:35:00 +02:00
parent 4cb8b54a2a
commit b40fa86bac
11 changed files with 63 additions and 119 deletions

View File

@ -116,7 +116,6 @@ public class AppActor extends ContextAwareActor {
case CF_INIT_MSG:
case CF_LINK_INIT_MSG:
case CF_STATE_RESTORE_MSG:
case CF_ENTITY_LIFECYCLE_MSG:
//TODO: use priority from the message body. For example, messages about CF lifecycle are important and Device lifecycle are not.
// same for the Linked telemetry.
onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, true);

View File

@ -196,7 +196,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
private void onEntityUpdated(ComponentLifecycleMsg msg, TbCallback callback) {
if (msg.getOldProfileId() != null && msg.getOldProfileId() != msg.getProfileId()) {
if (msg.getOldProfileId() != null && !msg.getOldProfileId().equals(msg.getProfileId())) {
cfEntityCache.update(tenantId, msg.getOldProfileId(), msg.getProfileId(), msg.getEntityId());
if (!isMyPartition(msg.getEntityId(), callback)) {
return;

View File

@ -49,6 +49,7 @@ import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
@ -175,7 +176,6 @@ public class TenantActor extends RuleChainManagerActor {
case CF_LINK_INIT_MSG:
case CF_STATE_RESTORE_MSG:
case CF_PARTITIONS_CHANGE_MSG:
case CF_ENTITY_LIFECYCLE_MSG:
onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, true);
break;
case CF_TELEMETRY_MSG:
@ -315,7 +315,8 @@ public class TenantActor extends RuleChainManagerActor {
onToDeviceActorMsg(new DeviceDeleteMsg(tenantId, deviceId), true);
deletedDevices.add(deviceId);
}
if (isRuleEngine && ruleChainsInitialized) {
if (isRuleEngine) {
if (ruleChainsInitialized) {
TbActorRef target = getEntityActorRef(msg.getEntityId());
if (target != null) {
if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) {
@ -330,6 +331,12 @@ public class TenantActor extends RuleChainManagerActor {
log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg);
}
}
if (cfActor != null) {
if (msg.getEntityId().getEntityType().isOneOf(EntityType.CALCULATED_FIELD, EntityType.DEVICE, EntityType.ASSET)) {
cfActor.tellWithHighPriority(new CalculatedFieldEntityLifecycleMsg(tenantId, msg));
}
}
}
}
private TbActorRef getOrCreateDeviceActor(DeviceId deviceId) {

View File

@ -28,15 +28,12 @@ import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
@ -65,8 +62,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.util.ProtoUtils.fromProto;
@Service
@TbRuleEngineComponent
@Slf4j
@ -164,9 +159,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
forwardToActorSystem(toCfMsg.getTelemetryMsg(), callback);
} else if (toCfMsg.hasLinkedTelemetryMsg()) {
forwardToActorSystem(toCfMsg.getLinkedTelemetryMsg(), callback);
} else if (toCfMsg.hasComponentLifecycleMsg()) {
log.trace("[{}] Forwarding component lifecycle message for processing {}", id, toCfMsg.getComponentLifecycleMsg());
forwardToActorSystem(toCfMsg.getComponentLifecycleMsg(), callback);
}
} catch (Throwable e) {
log.warn("[{}] Failed to process message: {}", id, msg, e);
@ -215,11 +207,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
@Override
protected void handleNotification(UUID id, TbProtoQueueMsg<ToCalculatedFieldNotificationMsg> msg, TbCallback callback) {
ToCalculatedFieldNotificationMsg toCfNotification = msg.getValue();
if (toCfNotification.hasComponentLifecycleMsg()) {
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCfNotification.getComponentLifecycleMsg()));
log.trace("[{}] Forwarding component lifecycle message for processing {}", id, toCfNotification.getComponentLifecycleMsg());
forwardToActorSystem(toCfNotification.getComponentLifecycleMsg(), callback);
} else if (toCfNotification.hasLinkedTelemetryMsg()) {
if (toCfNotification.hasLinkedTelemetryMsg()) {
forwardToActorSystem(toCfNotification.getLinkedTelemetryMsg(), callback);
}
}
@ -237,11 +225,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
actorContext.tell(new CalculatedFieldLinkedTelemetryMsg(tenantId, entityId, linkedMsg, callback));
}
private void forwardToActorSystem(ComponentLifecycleMsgProto proto, TbCallback callback) {
var msg = fromProto(proto);
actorContext.tell(new CalculatedFieldEntityLifecycleMsg(msg.getTenantId(), msg, callback));
}
private TenantId toTenantId(long tenantIdMSB, long tenantIdLSB) {
return TenantId.fromUUID(new UUID(tenantIdMSB, tenantIdLSB));
}

View File

@ -94,10 +94,8 @@ import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.TbRuleEngineProducerService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
import org.thingsboard.server.service.gateway_device.GatewayNotificationsService;
import org.thingsboard.server.service.ota.OtaPackageStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
@ -146,10 +144,6 @@ public class DefaultTbClusterService implements TbClusterService {
@Lazy
private OtaPackageStateService otaPackageStateService;
@Autowired
@Lazy
private CalculatedFieldProcessingService calculatedFieldProcessingService;
private final TopicService topicService;
private final TbDeviceProfileCache deviceProfileCache;
private final TbAssetProfileCache assetProfileCache;
@ -369,13 +363,6 @@ public class DefaultTbClusterService implements TbClusterService {
toRuleEngineMsgs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS
}
@Override
public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback);
toRuleEngineNfs.incrementAndGet();
}
@Override
public void broadcastEntityStateChangeEvent(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state) {
log.trace("[{}] Processing {} state change event: {}", tenantId, entityId.getEntityType(), state);
@ -431,7 +418,6 @@ public class DefaultTbClusterService implements TbClusterService {
public void onDeviceDeleted(TenantId tenantId, Device device, TbQueueCallback callback) {
DeviceId deviceId = device.getId();
gatewayNotificationsService.onDeviceDeleted(device);
handleCalculatedFieldEntityDeleted(tenantId, deviceId);
broadcastEntityDeleteToTransport(tenantId, deviceId, device.getName(), callback);
sendDeviceStateServiceEvent(tenantId, deviceId, false, false, true);
broadcastEntityStateChangeEvent(tenantId, deviceId, ComponentLifecycleEvent.DELETED);
@ -440,7 +426,6 @@ public class DefaultTbClusterService implements TbClusterService {
@Override
public void onAssetDeleted(TenantId tenantId, Asset asset, TbQueueCallback callback) {
AssetId assetId = asset.getId();
handleCalculatedFieldEntityDeleted(tenantId, assetId);
broadcastEntityStateChangeEvent(tenantId, assetId, ComponentLifecycleEvent.DELETED);
}
@ -604,6 +589,7 @@ public class DefaultTbClusterService implements TbClusterService {
|| (entityType.equals(EntityType.DEVICE) && msg.getEvent() == ComponentLifecycleEvent.UPDATED)
|| entityType.equals(EntityType.ENTITY_VIEW)
|| entityType.equals(EntityType.NOTIFICATION_RULE)
|| entityType.equals(EntityType.CALCULATED_FIELD)
) {
TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
@ -658,38 +644,28 @@ public class DefaultTbClusterService implements TbClusterService {
public void onDeviceUpdated(Device entity, Device old) {
var created = old == null;
broadcastEntityChangeToTransport(entity.getTenantId(), entity.getId(), entity, null);
if (old != null) {
var msg = ComponentLifecycleMsg.builder()
.tenantId(entity.getTenantId())
.entityId(entity.getId())
.profileId(entity.getDeviceProfileId())
.name(entity.getName());
if (created) {
msg.event(ComponentLifecycleEvent.CREATED);
} else {
boolean deviceNameChanged = !entity.getName().equals(old.getName());
if (deviceNameChanged) {
gatewayNotificationsService.onDeviceUpdated(entity, old);
}
boolean deviceProfileChanged = !entity.getDeviceProfileId().equals(old.getDeviceProfileId());
if (deviceProfileChanged) {
ComponentLifecycleMsg msg = ComponentLifecycleMsg.builder()
.tenantId(entity.getTenantId())
.entityId(entity.getId())
.event(ComponentLifecycleEvent.UPDATED)
.oldProfileId(old.getDeviceProfileId())
.profileId(entity.getDeviceProfileId())
.oldName(old.getName())
.name(entity.getName())
.build();
broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), TbQueueCallback.EMPTY);
}
if (deviceNameChanged || deviceProfileChanged) {
pushMsgToCore(new DeviceNameOrTypeUpdateMsg(entity.getTenantId(), entity.getId(), entity.getName(), entity.getType()), null);
}
} else {
ComponentLifecycleMsg msg = ComponentLifecycleMsg.builder()
.tenantId(entity.getTenantId())
.entityId(entity.getId())
.event(ComponentLifecycleEvent.CREATED)
.profileId(entity.getDeviceProfileId())
.name(entity.getName())
.build();
broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), TbQueueCallback.EMPTY);
msg.event(ComponentLifecycleEvent.UPDATED)
.oldProfileId(old.getDeviceProfileId())
.oldName(old.getName());
}
broadcastEntityStateChangeEvent(entity.getTenantId(), entity.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
broadcast(msg.build());
sendDeviceStateServiceEvent(entity.getTenantId(), entity.getId(), created, !created, false);
otaPackageStateService.update(entity, old);
}
@ -697,48 +673,29 @@ public class DefaultTbClusterService implements TbClusterService {
@Override
public void onAssetUpdated(Asset entity, Asset old) {
var created = old == null;
if (old != null) {
boolean assetTypeChanged = !entity.getAssetProfileId().equals(old.getAssetProfileId());
if (assetTypeChanged) {
ComponentLifecycleMsg msg = ComponentLifecycleMsg.builder()
var msg = ComponentLifecycleMsg.builder()
.tenantId(entity.getTenantId())
.entityId(entity.getId())
.event(ComponentLifecycleEvent.UPDATED)
.oldProfileId(old.getAssetProfileId())
.profileId(entity.getAssetProfileId())
.oldName(old.getName())
.name(entity.getName())
.build();
broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), TbQueueCallback.EMPTY);
}
.name(entity.getName());
if (created) {
msg.event(ComponentLifecycleEvent.CREATED);
} else {
ComponentLifecycleMsg msg = ComponentLifecycleMsg.builder()
.tenantId(entity.getTenantId())
.entityId(entity.getId())
.event(ComponentLifecycleEvent.CREATED)
.profileId(entity.getAssetProfileId())
.name(entity.getName())
.build();
broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), TbQueueCallback.EMPTY);
msg.event(ComponentLifecycleEvent.UPDATED)
.oldProfileId(old.getAssetProfileId())
.oldName(old.getName());
}
broadcastEntityStateChangeEvent(entity.getTenantId(), entity.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
broadcast(msg.build());
}
@Override
public void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField, TbQueueCallback callback) {
var msg = toProto(new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), oldCalculatedField == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED));
onCalculatedFieldLifecycleMsg(msg, callback);
broadcastEntityStateChangeEvent(calculatedField.getTenantId(), calculatedField.getId(), oldCalculatedField == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
}
@Override
public void onCalculatedFieldDeleted(CalculatedField calculatedField, TbQueueCallback callback) {
var msg = toProto(new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), ComponentLifecycleEvent.DELETED));
onCalculatedFieldLifecycleMsg(msg, callback);
}
private void onCalculatedFieldLifecycleMsg(ComponentLifecycleMsgProto msg, TbQueueCallback callback) {
broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(msg).build(), callback);
broadcastToCore(ToCoreNotificationMsg.newBuilder().setComponentLifecycle(msg).build());
broadcastEntityStateChangeEvent(calculatedField.getTenantId(), calculatedField.getId(), ComponentLifecycleEvent.DELETED);
}
@Override
@ -868,8 +825,4 @@ public class DefaultTbClusterService implements TbClusterService {
}
}
private void handleCalculatedFieldEntityDeleted(TenantId tenantId, EntityId entityId) {
ComponentLifecycleMsg msg = new ComponentLifecycleMsg(tenantId, entityId, ComponentLifecycleEvent.DELETED);
broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), TbQueueCallback.EMPTY);
}
}

View File

@ -29,7 +29,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
@ -234,7 +233,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
List<QueueKey> toRemove = consumers.keySet().stream()
.filter(queueKey -> queueKey.getTenantId().equals(event.getTenantId()))
.collect(Collectors.toList());
.toList();
toRemove.forEach(queueKey -> {
removeConsumer(queueKey).ifPresent(consumer -> consumer.delete(false));
});

View File

@ -85,8 +85,6 @@ public interface TbClusterService extends TbQueueClusterService {
void pushMsgToCalculatedFields(TopicPartitionInfo tpi, UUID msgId, ToCalculatedFieldMsg msg, TbQueueCallback callback);
void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, TransportProtos.ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback);
void broadcastEntityStateChangeEvent(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
void onDeviceProfileChange(DeviceProfile deviceProfile, DeviceProfile oldDeviceProfile, TbQueueCallback callback);

View File

@ -86,4 +86,15 @@ public enum EntityType {
this.tableName = tableName;
}
public boolean isOneOf(EntityType... types) {
if (types == null) {
return false;
}
for (EntityType type : types) {
if (this == type) {
return true;
}
}
return false;
}
}

View File

@ -16,19 +16,16 @@
package org.thingsboard.server.common.msg.cf;
import lombok.Data;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
@Data
public class CalculatedFieldEntityLifecycleMsg implements ToCalculatedFieldSystemMsg {
private final TenantId tenantId;
private final ComponentLifecycleMsg data;
private final TbCallback callback;
@Override
public MsgType getMsgType() {

View File

@ -1674,14 +1674,12 @@ message ToEdgeEventNotificationMsg {
}
message ToCalculatedFieldMsg {
ComponentLifecycleMsgProto componentLifecycleMsg = 1;
CalculatedFieldTelemetryMsgProto telemetryMsg = 2;
CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsg = 3;
CalculatedFieldTelemetryMsgProto telemetryMsg = 1;
CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsg = 2;
}
message ToCalculatedFieldNotificationMsg {
ComponentLifecycleMsgProto componentLifecycleMsg = 1;
CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsg = 2;
CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsg = 1;
}
/* Messages that are handled by ThingsBoard RuleEngine Service */

View File

@ -24,7 +24,6 @@ import org.thingsboard.server.queue.discovery.QueueKey;
import java.io.Serial;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;