diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java index f953e57cc3..ad683c324c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java @@ -34,6 +34,8 @@ public interface CalculatedFieldCache { List getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId); + void updateCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId); + CalculatedFieldCtx getCalculatedFieldCtx(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService); Set getEntitiesByProfile(TenantId tenantId, EntityId entityId); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index dd2ab3857c..a4077b599b 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -141,6 +141,31 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { return cfLinks; } + + @Override + public void updateCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId) { + log.debug("Update calculated field links per entity for calculated field: [{}]", calculatedFieldId); + calculatedFieldFetchLock.lock(); + try { + List cfLinks = getCalculatedFieldLinks(tenantId, calculatedFieldId); + if (cfLinks != null && !cfLinks.isEmpty()) { + cfLinks.forEach(link -> { + entityIdCalculatedFieldLinks.compute(link.getEntityId(), (id, existingList) -> { + if (existingList == null) { + existingList = new ArrayList<>(); + } else if (!(existingList instanceof ArrayList)) { + existingList = new ArrayList<>(existingList); + } + existingList.add(link); + return existingList; + }); + }); + } + } finally { + calculatedFieldFetchLock.unlock(); + } + } + @Override public CalculatedFieldCtx getCalculatedFieldCtx(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService) { CalculatedFieldCtx ctx = calculatedFieldsCtx.get(calculatedFieldId); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index 976139e8bd..2f55de6b77 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -71,7 +71,6 @@ import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; @@ -105,7 +104,6 @@ import static org.thingsboard.server.common.data.DataConstants.SCOPE; import static org.thingsboard.server.common.util.ProtoUtils.fromObjectProto; import static org.thingsboard.server.common.util.ProtoUtils.toObjectProto; -@TbCoreComponent @Service @Slf4j @RequiredArgsConstructor @@ -117,7 +115,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private final CalculatedFieldCache calculatedFieldCache; private final AttributesService attributesService; private final TimeseriesService timeseriesService; - private final RocksDBService rocksDBService; + // private final RocksDBService rocksDBService; private final TbClusterService clusterService; private final TbelInvokeService tbelInvokeService; @@ -213,8 +211,9 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private void restoreState(CalculatedField cf, EntityId entityId) { CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cf.getId().getId(), entityId.getId()); - String storedState = rocksDBService.get(JacksonUtil.writeValueAsString(ctxId)); +// String storedState = rocksDBService.get(JacksonUtil.writeValueAsString(ctxId)); + String storedState = null; if (storedState != null) { CalculatedFieldEntityCtx restoredCtx = JacksonUtil.fromString(storedState, CalculatedFieldEntityCtx.class); states.put(ctxId, restoredCtx); @@ -313,13 +312,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas if (calculatedFieldIds != null) { calculatedFieldIds.remove(calculatedFieldId); } - calculatedFieldCache.evict(calculatedFieldId); +// calculatedFieldCache.evict(calculatedFieldId); states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())); List statesToRemove = states.keySet().stream() .filter(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())) .map(JacksonUtil::writeValueAsString) .toList(); - rocksDBService.deleteAll(statesToRemove); +// rocksDBService.deleteAll(statesToRemove); } catch (Exception e) { log.trace("Failed to delete calculated field: [{}]", calculatedFieldId, e); callback.onFailure(e); @@ -414,7 +413,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } default -> updateOrInitializeState(calculatedFieldCtx, cfEntityId, argumentValues, calculatedFieldIds); } - log.info("Successfully updated telemetry for calculatedFieldId: [{}]", calculatedFieldId); } @Override @@ -423,7 +421,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB())); EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); - + log.info("Received CalculatedFieldStateMsgProto for processing: tenantId=[{}], calculatedFieldId=[{}], entityId=[{}]", tenantId, calculatedFieldId, entityId); if (proto.getClear()) { clearState(tenantId, calculatedFieldId, entityId); return; @@ -431,7 +429,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas List calculatedFieldIds = proto.getCalculatedFieldsList().stream() .map(cfIdProto -> new CalculatedFieldId(new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB()))) - .toList(); + .collect(Collectors.toCollection(ArrayList::new)); Map argumentsMap = proto.getArgumentsMap().entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> fromArgumentEntryProto(entry.getValue()))); @@ -451,8 +449,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas EntityId newProfileId = EntityIdFactory.getByTypeAndUuid(proto.getEntityProfileType(), new UUID(proto.getNewProfileIdMSB(), proto.getNewProfileIdLSB())); log.info("Received EntityProfileUpdateMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId); - calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfileId).remove(entityId); - calculatedFieldCache.getEntitiesByProfile(tenantId, newProfileId).add(entityId); +// calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfileId).remove(entityId); +// calculatedFieldCache.getEntitiesByProfile(tenantId, newProfileId).add(entityId); calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId) .forEach(cfId -> clearState(tenantId, cfId, entityId)); @@ -472,15 +470,18 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas log.info("Received ProfileEntityMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId); if (proto.getDeleted()) { log.info("Executing profile entity deleted msg, tenantId=[{}], entityId=[{}]", tenantId, entityId); - calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).remove(entityId); +// calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).remove(entityId); + + List calculatedFieldIds = Stream.concat( calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId).stream().map(CalculatedFieldLink::getCalculatedFieldId), calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, profileId).stream().map(CalculatedFieldLink::getCalculatedFieldId) ).toList(); + calculatedFieldIds.forEach(cfId -> clearState(tenantId, cfId, entityId)); } else { log.info("Executing profile entity added msg, tenantId=[{}], entityId=[{}]", tenantId, entityId); - calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).add(entityId); +// calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).add(entityId); initializeStateForEntityByProfile(tenantId, entityId, profileId, callback); } } catch (Exception e) { @@ -494,7 +495,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas log.warn("Executing clearState, calculatedFieldId=[{}], entityId=[{}]", calculatedFieldId, entityId); CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId.getId(), entityId.getId()); states.remove(ctxId); - rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); +// rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); } else { sendClearCalculatedFieldStateMsg(tenantId, calculatedFieldId, entityId); } @@ -558,13 +559,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas Consumer performUpdateState = (state) -> { if (state.updateState(argumentsMap)) { calculatedFieldEntityCtx.setState(state); - rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); +// rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); Map arguments = state.getArguments(); boolean allArgsPresent = arguments.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) && !arguments.containsValue(SingleValueArgumentEntry.EMPTY) && !arguments.containsValue(TsRollingArgumentEntry.EMPTY); if (allArgsPresent) { performCalculation(calculatedFieldCtx, state, entityId, calculatedFieldIds); } + log.info("Successfully updated state: calculatedFieldId=[{}], entityId=[{}]", calculatedFieldCtx.getCfId(), entityId); } updateFuture.complete(null); }; @@ -633,6 +635,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas calculatedFieldIds.add(calculatedFieldId); TbMsg msg = TbMsg.newMsg().type(msgType).originator(originatorId).calculatedFieldIds(calculatedFieldIds).metaData(md).data(JacksonUtil.writeValueAsString(payload)).build(); clusterService.pushMsgToRuleEngine(tenantId, originatorId, msg, null); + log.info("Pushed message to rule engine: originatorId=[{}]", originatorId); } catch (Exception e) { log.warn("[{}] Failed to push message to rule engine. CalculatedFieldResult: {}", originatorId, calculatedFieldResult, e); } @@ -715,6 +718,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas )); } + log.info("Sending calculated field state msg from entityId [{}]", entityId); clusterService.pushMsgToCore(tenantId, calculatedFieldId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msgBuilder).build(), null); } @@ -790,11 +794,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState(CalculatedFieldEntityCtxId entityCtxId, CalculatedFieldType cfType) { - String stateStr = rocksDBService.get(JacksonUtil.writeValueAsString(entityCtxId)); - if (stateStr == null) { - return new CalculatedFieldEntityCtx(entityCtxId, createStateByType(cfType)); - } - return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class); +// String stateStr = rocksDBService.get(JacksonUtil.writeValueAsString(entityCtxId)); +// if (stateStr == null) { + return new CalculatedFieldEntityCtx(entityCtxId, createStateByType(cfType)); +// } +// return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class); } private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 6316d1fdb2..551b0ddbc2 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -391,7 +391,7 @@ public class DefaultTbClusterService implements TbClusterService { public void onDeviceDeleted(TenantId tenantId, Device device, TbQueueCallback callback) { DeviceId deviceId = device.getId(); gatewayNotificationsService.onDeviceDeleted(device); - sendProfileEntityEvent(tenantId, deviceId, device.getDeviceProfileId(), false, true); + handleProfileEntityEvent(tenantId, deviceId, device.getDeviceProfileId(), false, true); broadcastEntityDeleteToTransport(tenantId, deviceId, device.getName(), callback); sendDeviceStateServiceEvent(tenantId, deviceId, false, false, true); broadcastEntityStateChangeEvent(tenantId, deviceId, ComponentLifecycleEvent.DELETED); @@ -400,7 +400,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onAssetDeleted(TenantId tenantId, Asset asset, TbQueueCallback callback) { AssetId assetId = asset.getId(); - sendProfileEntityEvent(tenantId, assetId, asset.getAssetProfileId(), false, true); + handleProfileEntityEvent(tenantId, assetId, asset.getAssetProfileId(), true, true); broadcastEntityStateChangeEvent(tenantId, assetId, ComponentLifecycleEvent.DELETED); } @@ -563,7 +563,9 @@ public class DefaultTbClusterService implements TbClusterService { || entityType.equals(EntityType.API_USAGE_STATE) || (entityType.equals(EntityType.DEVICE) && msg.getEvent() == ComponentLifecycleEvent.UPDATED) || entityType.equals(EntityType.ENTITY_VIEW) - || entityType.equals(EntityType.NOTIFICATION_RULE)) { + || entityType.equals(EntityType.NOTIFICATION_RULE) + || entityType.equals(EntityType.CALCULATED_FIELD) + ) { TbQueueProducer> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer(); Set tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE); for (String serviceId : tbCoreServices) { @@ -624,13 +626,13 @@ public class DefaultTbClusterService implements TbClusterService { } boolean deviceTypeChanged = !device.getType().equals(old.getType()); if (deviceTypeChanged) { - sendEntityProfileUpdatedEvent(device.getTenantId(), device.getId(), old.getDeviceProfileId(), device.getDeviceProfileId()); + handleEntityProfileUpdatedEvent(device.getTenantId(), device.getId(), old.getDeviceProfileId(), device.getDeviceProfileId()); } if (deviceNameChanged || deviceTypeChanged) { pushMsgToCore(new DeviceNameOrTypeUpdateMsg(device.getTenantId(), device.getId(), device.getName(), device.getType()), null); } } else { - sendProfileEntityEvent(device.getTenantId(), device.getId(), device.getDeviceProfileId(), true, false); + handleProfileEntityEvent(device.getTenantId(), device.getId(), device.getDeviceProfileId(), true, false); } broadcastEntityStateChangeEvent(device.getTenantId(), device.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); sendDeviceStateServiceEvent(device.getTenantId(), device.getId(), created, !created, false); @@ -644,10 +646,10 @@ public class DefaultTbClusterService implements TbClusterService { if (old != null) { boolean assetTypeChanged = !asset.getType().equals(old.getType()); if (assetTypeChanged) { - sendEntityProfileUpdatedEvent(asset.getTenantId(), asset.getId(), old.getAssetProfileId(), asset.getAssetProfileId()); + handleEntityProfileUpdatedEvent(asset.getTenantId(), asset.getId(), old.getAssetProfileId(), asset.getAssetProfileId()); } } else { - sendProfileEntityEvent(asset.getTenantId(), asset.getId(), asset.getAssetProfileId(), true, false); + handleProfileEntityEvent(asset.getTenantId(), asset.getId(), asset.getAssetProfileId(), true, false); } broadcastEntityStateChangeEvent(asset.getTenantId(), asset.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); } @@ -792,8 +794,8 @@ public class DefaultTbClusterService implements TbClusterService { public void onCalculatedFieldDeleted(TenantId tenantId, CalculatedField calculatedField, TbQueueCallback callback) { CalculatedFieldId calculatedFieldId = calculatedField.getId(); broadcastEntityDeleteToTransport(tenantId, calculatedFieldId, calculatedField.getName(), callback); - sendCalculatedFieldEvent(tenantId, calculatedFieldId, false, false, true); broadcastEntityStateChangeEvent(tenantId, calculatedFieldId, ComponentLifecycleEvent.DELETED); + sendCalculatedFieldEvent(tenantId, calculatedFieldId, false, false, true); } private void sendCalculatedFieldEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, boolean added, boolean updated, boolean deleted) { @@ -809,7 +811,7 @@ public class DefaultTbClusterService implements TbClusterService { pushMsgToCore(tenantId, calculatedFieldId, ToCoreMsg.newBuilder().setCalculatedFieldMsg(msg).build(), null); } - private void sendEntityProfileUpdatedEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) { + private void handleEntityProfileUpdatedEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) { TransportProtos.EntityProfileUpdateMsgProto.Builder builder = TransportProtos.EntityProfileUpdateMsgProto.newBuilder(); builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); @@ -822,10 +824,12 @@ public class DefaultTbClusterService implements TbClusterService { builder.setNewProfileIdMSB(newProfileId.getId().getMostSignificantBits()); builder.setNewProfileIdLSB(newProfileId.getId().getLeastSignificantBits()); TransportProtos.EntityProfileUpdateMsgProto msg = builder.build(); + + broadcastToCore(ToCoreNotificationMsg.newBuilder().setEntityProfileUpdateMsg(msg).build()); pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setEntityProfileUpdateMsg(msg).build(), null); } - private void sendProfileEntityEvent(TenantId tenantId, EntityId entityId, EntityId profileId, boolean added, boolean deleted) { + private void handleProfileEntityEvent(TenantId tenantId, EntityId entityId, EntityId profileId, boolean added, boolean deleted) { TransportProtos.ProfileEntityMsgProto.Builder builder = TransportProtos.ProfileEntityMsgProto.newBuilder(); builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); @@ -838,6 +842,8 @@ public class DefaultTbClusterService implements TbClusterService { builder.setAdded(added); builder.setDeleted(deleted); TransportProtos.ProfileEntityMsgProto msg = builder.build(); + + broadcastToCore(ToCoreNotificationMsg.newBuilder().setProfileEntityMsg(msg).build()); pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setProfileEntityMsg(msg).build(), null); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 1be25b308f..6baa75b3ef 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.event.Event; import org.thingsboard.server.common.data.event.LifecycleEvent; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.NotificationRequestId; import org.thingsboard.server.common.data.id.TenantId; @@ -87,6 +88,7 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; +import org.thingsboard.server.service.cf.CalculatedFieldCache; import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; import org.thingsboard.server.service.notification.NotificationSchedulerService; import org.thingsboard.server.service.ota.OtaPackageStateService; @@ -109,6 +111,7 @@ import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpd import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -181,8 +184,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService entitiesByProfile = calculatedFieldCache.getEntitiesByProfile(tenantId, profileId); + if (added) { + entitiesByProfile.add(entityId); + } else { + entitiesByProfile.remove(entityId); + } + } + private void forwardToSubMgrService(SubscriptionMgrMsgProto msg, TbCallback callback) { if (msg.hasSubEvent()) { TbEntitySubEventProto subEvent = msg.getSubEvent(); @@ -688,12 +718,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityProfileChangedMsg(profileUpdateMsg, callback)); DonAsynchron.withCallback(future, __ -> callback.onSuccess(), t -> { - log.warn("[{}] Failed to process device type updated message for device [{}]", tenantId.getId(), entityId.getId(), t); + log.warn("[{}] Failed to process entity profile updated message for entity [{}]", tenantId.getId(), entityId.getId(), t); callback.onFailure(t); }); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java index f219d7ae69..35c7894f2f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java @@ -91,7 +91,7 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService future = edgeCtx.getTenantProfileProcessor().processEntityNotification(tenantId, edgeNotificationMsg); case NOTIFICATION_RULE, NOTIFICATION_TARGET, NOTIFICATION_TEMPLATE -> future = edgeCtx.getNotificationEdgeProcessor().processEntityNotification(tenantId, edgeNotificationMsg); - case TB_RESOURCE -> future = edgeCtx.getResourceProcessor().processEntityNotification(tenantId, edgeNotificationMsg); - case DOMAIN, OAUTH2_CLIENT -> future = edgeCtx.getOAuth2EdgeProcessor().processEntityNotification(tenantId, edgeNotificationMsg); + case TB_RESOURCE -> + future = edgeCtx.getResourceProcessor().processEntityNotification(tenantId, edgeNotificationMsg); + case DOMAIN, OAUTH2_CLIENT -> + future = edgeCtx.getOAuth2EdgeProcessor().processEntityNotification(tenantId, edgeNotificationMsg); default -> { future = Futures.immediateFuture(null); log.warn("[{}] Edge event type [{}] is not designed to be pushed to edge", tenantId, type); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 73810949b3..7d4d975cb4 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -46,6 +46,7 @@ import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; +import org.thingsboard.server.service.cf.CalculatedFieldCache; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; @@ -83,8 +84,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< TbApiUsageStateService apiUsageStateService, PartitionService partitionService, ApplicationEventPublisher eventPublisher, - JwtSettingsService jwtSettingsService) { - super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService); + JwtSettingsService jwtSettingsService, + CalculatedFieldCache calculatedFieldCache) { + super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService); this.ctx = ctx; this.tbDeviceRpcService = tbDeviceRpcService; this.queueService = queueService; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 95fc8cb843..2aaed13ec1 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -25,6 +25,7 @@ import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; +import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; @@ -43,6 +44,7 @@ import org.thingsboard.server.queue.discovery.TbApplicationEventListener; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; +import org.thingsboard.server.service.cf.CalculatedFieldCache; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.TbPackCallback; @@ -68,6 +70,7 @@ public abstract class AbstractConsumerService