diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java index 4812ed6652..4879fa4566 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java @@ -75,8 +75,8 @@ public class CalculatedFieldEntityActor extends AbstractCalculatedFieldActor { case CF_LINKED_TELEMETRY_MSG: processor.process((EntityCalculatedFieldLinkedTelemetryMsg) msg); break; - case CF_ENTITY_CHECK_FOR_UPDATES_MSG: - processor.process((EntityCalculatedFieldCheckForUpdatesMsg) msg); + case CF_ENTITY_MARK_STATE_DIRTY_MSG: + processor.process((EntityCalculatedFieldMarkStateDirtyMsg) msg); break; default: return false; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 95b705adfc..6d832a9f23 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -228,29 +228,16 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } - public void process(EntityCalculatedFieldCheckForUpdatesMsg msg) throws CalculatedFieldException { - CalculatedFieldCtx cfCtx = msg.getCfCtx(); - CalculatedFieldId cfId = cfCtx.getCfId(); - log.debug("[{}][{}] Processing CF check for updates msg.", entityId, cfId); - CalculatedFieldState currentState = states.get(cfId); - try { - var stateFromDb = getStateFromDb(cfCtx); - if (currentState.equals(stateFromDb)) { - log.debug("[{}][{}] CF state is up-to-date.", entityId, cfId); - return; - } - states.put(cfId, stateFromDb); - if (stateFromDb.isSizeOk()) { - processStateIfReady(cfCtx, Collections.singletonList(cfId), stateFromDb, null, null, msg.getCallback()); - } else { - throw new RuntimeException(cfCtx.getSizeExceedsLimitMessage()); - } - } catch (Exception e) { - if (e instanceof CalculatedFieldException cfe) { - throw cfe; - } - throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(entityId).cause(e).build(); + public void process(EntityCalculatedFieldMarkStateDirtyMsg msg) throws CalculatedFieldException { + log.debug("[{}][{}] Processing entity CF invalidation msg.", entityId, msg.getCfId()); + CalculatedFieldState currentState = states.get(msg.getCfId()); + if (currentState == null) { + log.debug("[{}][{}] Failed to find CF state for entity.", entityId, msg.getCfId()); + } else { + currentState.setDirty(true); + log.debug("[{}][{}] CF state marked as dirty.", entityId, msg.getCfId()); } + msg.getCallback().onSuccess(); } private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { @@ -280,6 +267,15 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (state == null) { state = getOrInitState(ctx); justRestored = true; + } else if (state.isDirty()) { + log.debug("[{}][{}] Going to update dirty CF state.", entityId, ctx.getCfId()); + try { + Map dynamicArgsFromDb = cfService.fetchDynamicArgsFromDb(ctx, entityId); + dynamicArgsFromDb.forEach(newArgValues::putIfAbsent); + state.setDirty(false); + } catch (Exception e) { + throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); + } } if (state.isSizeOk()) { if (state.updateState(ctx, newArgValues) || justRestored) { diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java index 5adca78fa9..8494fb3847 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java @@ -91,8 +91,8 @@ public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor { case CF_LINKED_TELEMETRY_MSG: processor.onLinkedTelemetryMsg((CalculatedFieldLinkedTelemetryMsg) msg); break; - case CF_SCHEDULED_CHECK_FOR_UPDATES_MSG: - processor.onScheduledCheckForUpdatesMsg((CalculatedFieldScheduledCheckForUpdatesMsg) msg); + case CF_SCHEDULED_INVALIDATION_MSG: + processor.onScheduledInvalidationMsg((CalculatedFieldScheduledInvalidationMsg) msg); break; default: return false; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index de6c38b9b9..91122f3f95 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -76,7 +76,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private final Map calculatedFields = new HashMap<>(); private final Map> entityIdCalculatedFields = new HashMap<>(); private final Map> entityIdCalculatedFieldLinks = new HashMap<>(); - private final Map> checkForCalculatedFieldUpdateTasks = new ConcurrentHashMap<>(); + private final Map> cfInvalidationScheduledTasks = new ConcurrentHashMap<>(); private final CalculatedFieldProcessingService cfExecService; private final CalculatedFieldStateService cfStateService; @@ -115,8 +115,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware calculatedFields.clear(); entityIdCalculatedFields.clear(); entityIdCalculatedFieldLinks.clear(); - checkForCalculatedFieldUpdateTasks.values().forEach(future -> future.cancel(true)); - checkForCalculatedFieldUpdateTasks.clear(); + cfInvalidationScheduledTasks.values().forEach(future -> future.cancel(true)); + cfInvalidationScheduledTasks.clear(); ctx.stop(ctx.getSelf()); } @@ -147,7 +147,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware // We use copy on write lists to safely pass the reference to another actor for the iteration. // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead) entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx); - scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx); + scheduleCalculatedFieldInvalidationMsgIfNeeded(cfCtx); msg.getCallback().onSuccess(); } @@ -336,7 +336,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware boolean hasSchedulingConfigChanges = newCfCtx.hasSchedulingConfigChanges(oldCfCtx); if (hasSchedulingConfigChanges) { - cancelCfUpdateTaskIfExists(cfId, false); + cancelCfScheduledInvalidationTaskIfExists(cfId, false); } List newCfList = new CopyOnWriteArrayList<>(); @@ -379,7 +379,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx); deleteLinks(cfCtx); - cancelCfUpdateTaskIfExists(cfId, true); + cancelCfScheduledInvalidationTaskIfExists(cfId, true); EntityId entityId = cfCtx.getEntityId(); EntityType entityType = cfCtx.getEntityId().getEntityType(); @@ -404,12 +404,12 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } } - private void cancelCfUpdateTaskIfExists(CalculatedFieldId cfId, boolean cfDeleted) { - var existingTask = checkForCalculatedFieldUpdateTasks.remove(cfId); + private void cancelCfScheduledInvalidationTaskIfExists(CalculatedFieldId cfId, boolean cfDeleted) { + var existingTask = cfInvalidationScheduledTasks.remove(cfId); if (existingTask != null) { existingTask.cancel(false); - String reason = cfDeleted ? "removal" : "update"; - log.debug("[{}][{}] Cancelled check for update task due to CF " + reason + "!", tenantId, cfId); + String reason = cfDeleted ? "deletion" : "update"; + log.debug("[{}][{}] Cancelled scheduled invalidation task due to CF " + reason + "!", tenantId, cfId); } } @@ -515,7 +515,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private void initCf(CalculatedFieldCtx cfCtx, TbCallback callback, boolean forceStateReinit) { EntityId entityId = cfCtx.getEntityId(); EntityType entityType = cfCtx.getEntityId().getEntityType(); - scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx); + scheduleCalculatedFieldInvalidationMsgIfNeeded(cfCtx); if (isProfileEntity(entityType)) { var entityIds = entityProfileCache.getEntityIdsByProfileId(entityId); if (!entityIds.isEmpty()) { @@ -533,31 +533,31 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } } - private void scheduleCalculatedFieldUpdateMsgIfNeeded(CalculatedFieldCtx cfCtx) { + private void scheduleCalculatedFieldInvalidationMsgIfNeeded(CalculatedFieldCtx cfCtx) { CalculatedField cf = cfCtx.getCalculatedField(); CalculatedFieldConfiguration cfConfig = cf.getConfiguration(); if (!cfConfig.isScheduledUpdateEnabled()) { return; } - if (checkForCalculatedFieldUpdateTasks.containsKey(cf.getId())) { - log.debug("[{}][{}] Check for update msg for CF is already scheduled!", tenantId, cf.getId()); + if (cfInvalidationScheduledTasks.containsKey(cf.getId())) { + log.debug("[{}][{}] Scheduled invalidation task for CF already exists!", tenantId, cf.getId()); return; } long refreshDynamicSourceInterval = TimeUnit.SECONDS.toMillis(cfConfig.getScheduledUpdateIntervalSec()); - var scheduledMsg = new CalculatedFieldScheduledCheckForUpdatesMsg(tenantId, cfCtx.getCfId()); + var scheduledMsg = new CalculatedFieldScheduledInvalidationMsg(tenantId, cfCtx.getCfId()); ScheduledFuture scheduledFuture = systemContext .schedulePeriodicMsgWithDelay(ctx, scheduledMsg, refreshDynamicSourceInterval, refreshDynamicSourceInterval); - checkForCalculatedFieldUpdateTasks.put(cf.getId(), scheduledFuture); - log.debug("[{}][{}] Scheduled check for update msg for CF!", tenantId, cf.getId()); + cfInvalidationScheduledTasks.put(cf.getId(), scheduledFuture); + log.debug("[{}][{}] Scheduled invalidation task for CF!", tenantId, cf.getId()); } - public void onScheduledCheckForUpdatesMsg(CalculatedFieldScheduledCheckForUpdatesMsg msg) { - log.debug("[{}] [{}] Processing CF scheduled update msg.", tenantId, msg.getCfId()); + public void onScheduledInvalidationMsg(CalculatedFieldScheduledInvalidationMsg msg) { + log.debug("[{}] [{}] Processing CF scheduled invalidation msg.", tenantId, msg.getCfId()); CalculatedFieldCtx cfCtx = calculatedFields.get(msg.getCfId()); if (cfCtx == null) { - log.debug("[{}][{}] Failed to find CF context, going to stop scheduler updates.", tenantId, msg.getCfId()); - cancelCfUpdateTaskIfExists(msg.getCfId(), true); + log.debug("[{}][{}] Failed to find CF context, going to stop scheduled invalidations for CF.", tenantId, msg.getCfId()); + cancelCfScheduledInvalidationTaskIfExists(msg.getCfId(), true); return; } EntityId entityId = cfCtx.getEntityId(); @@ -568,7 +568,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware var multiCallback = new MultipleTbCallback(entityIds.size(), msg.getCallback()); entityIds.forEach(id -> { if (isMyPartition(id, multiCallback)) { - updateCfWithDynamicSourceForEntity(id, cfCtx, multiCallback); + InitCfInvalidationForEntity(id, msg.getCfId(), multiCallback); } }); } else { @@ -576,14 +576,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } } else { if (isMyPartition(entityId, msg.getCallback())) { - updateCfWithDynamicSourceForEntity(entityId, cfCtx, msg.getCallback()); + InitCfInvalidationForEntity(entityId, msg.getCfId(), msg.getCallback()); } } } - private void updateCfWithDynamicSourceForEntity(EntityId entityId, CalculatedFieldCtx cfCtx, TbCallback callback) { - log.debug("Pushing entity dynamic source refresh CF msg to specific actor [{}]", entityId); - getOrCreateActor(entityId).tell(new EntityCalculatedFieldCheckForUpdatesMsg(tenantId, cfCtx, callback)); + private void InitCfInvalidationForEntity(EntityId entityId, CalculatedFieldId cfId, TbCallback callback) { + log.debug("Pushing entity CF invalidation msg to specific actor [{}]", entityId); + getOrCreateActor(entityId).tell(new EntityCalculatedFieldMarkStateDirtyMsg(tenantId, cfId, callback)); } private void deleteCfForEntity(EntityId entityId, CalculatedFieldId cfId, TbCallback callback) { diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldScheduledCheckForUpdatesMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldScheduledInvalidationMsg.java similarity index 87% rename from application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldScheduledCheckForUpdatesMsg.java rename to application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldScheduledInvalidationMsg.java index 95d6b6759a..08a2394119 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldScheduledCheckForUpdatesMsg.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldScheduledInvalidationMsg.java @@ -22,14 +22,14 @@ import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; @Data -public class CalculatedFieldScheduledCheckForUpdatesMsg implements ToCalculatedFieldSystemMsg { +public class CalculatedFieldScheduledInvalidationMsg implements ToCalculatedFieldSystemMsg { private final TenantId tenantId; private final CalculatedFieldId cfId; @Override public MsgType getMsgType() { - return MsgType.CF_SCHEDULED_CHECK_FOR_UPDATES_MSG; + return MsgType.CF_SCHEDULED_INVALIDATION_MSG; } } diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldCheckForUpdatesMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldMarkStateDirtyMsg.java similarity index 80% rename from application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldCheckForUpdatesMsg.java rename to application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldMarkStateDirtyMsg.java index 908680c068..ef9864aa83 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldCheckForUpdatesMsg.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldMarkStateDirtyMsg.java @@ -16,22 +16,22 @@ package org.thingsboard.server.actors.calculatedField; import lombok.Data; +import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; @Data -public class EntityCalculatedFieldCheckForUpdatesMsg implements ToCalculatedFieldSystemMsg { +public class EntityCalculatedFieldMarkStateDirtyMsg implements ToCalculatedFieldSystemMsg { private final TenantId tenantId; - private final CalculatedFieldCtx cfCtx; + private final CalculatedFieldId cfId; private final TbCallback callback; @Override public MsgType getMsgType() { - return MsgType.CF_ENTITY_CHECK_FOR_UPDATES_MSG; + return MsgType.CF_ENTITY_MARK_STATE_DIRTY_MSG; } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java index 847caccaff..86ed174485 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java @@ -34,6 +34,8 @@ public interface CalculatedFieldProcessingService { ListenableFuture fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId); + Map fetchDynamicArgsFromDb(CalculatedFieldCtx ctx, EntityId entityId); + Map fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map arguments); void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List cfIds, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index 29c4809a75..dd4049241c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; +import org.thingsboard.server.common.data.cf.configuration.CFArgumentDynamicSourceType; import org.thingsboard.server.common.data.cf.configuration.GeofencingCalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.GeofencingZoneGroupConfiguration; import org.thingsboard.server.common.data.cf.configuration.OutputType; @@ -90,6 +91,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -132,20 +134,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP Map> argFutures = new HashMap<>(); if (ctx.getCalculatedField().getType().equals(CalculatedFieldType.GEOFENCING)) { - var configuration = (GeofencingCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration(); - var zoneGroupConfigs = configuration.getGeofencingZoneGroupConfigurations(); - for (var entry : ctx.getArguments().entrySet()) { - switch (entry.getKey()) { - case ENTITY_ID_LATITUDE_ARGUMENT_KEY, ENTITY_ID_LONGITUDE_ARGUMENT_KEY -> - argFutures.put(entry.getKey(), fetchKvEntry(ctx.getTenantId(), resolveEntityId(entityId, entry), entry.getValue())); - default -> { - var zoneGroupConfiguration = zoneGroupConfigs.get(entry.getKey()); - var resolvedEntityIdsFuture = resolveGeofencingEntityIds(ctx.getTenantId(), entityId, entry); - argFutures.put(entry.getKey(), Futures.transformAsync(resolvedEntityIdsFuture, resolvedEntityIds -> - fetchGeofencingKvEntry(ctx.getTenantId(), resolvedEntityIds, entry.getValue(), zoneGroupConfiguration), calculatedFieldCallbackExecutor)); - } - } - } + fetchGeofencingCalculatedFieldArguments(ctx, entityId, argFutures, false); } else { for (var entry : ctx.getArguments().entrySet()) { var argEntityId = resolveEntityId(entityId, entry); @@ -156,22 +145,45 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP return Futures.whenAllComplete(argFutures.values()).call(() -> { var result = createStateByType(ctx); - result.updateState(ctx, argFutures.entrySet().stream() - .collect(Collectors.toMap( - Entry::getKey, // Keep the key as is - entry -> { - try { - // Resolve the future to get the value - return entry.getValue().get(); - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException("Error getting future result for key: " + entry.getKey(), e); - } - } - ))); + result.updateState(ctx, resolveArgumentFutures(argFutures)); return result; }, calculatedFieldCallbackExecutor); } + @Override + public Map fetchDynamicArgsFromDb(CalculatedFieldCtx ctx, EntityId entityId) { + // only geofencing calculated fields supports dynamic arguments scheduled updates + if (!ctx.getCalculatedField().getType().equals(CalculatedFieldType.GEOFENCING)) { + return Map.of(); + } + Map> argFutures = new HashMap<>(); + fetchGeofencingCalculatedFieldArguments(ctx, entityId, argFutures, true); + return resolveArgumentFutures(argFutures); + } + + private void fetchGeofencingCalculatedFieldArguments(CalculatedFieldCtx ctx, EntityId entityId, Map> argFutures, boolean dynamicArgumentsOnly) { + var configuration = (GeofencingCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration(); + var zoneGroupConfigs = configuration.getGeofencingZoneGroupConfigurations(); + Set> entries = ctx.getArguments().entrySet(); + if (dynamicArgumentsOnly) { + entries = entries.stream() + .filter(entry -> CFArgumentDynamicSourceType.RELATION_QUERY.equals(entry.getValue().getRefDynamicSource())) + .collect(Collectors.toSet()); + } + for (var entry : entries) { + switch (entry.getKey()) { + case ENTITY_ID_LATITUDE_ARGUMENT_KEY, ENTITY_ID_LONGITUDE_ARGUMENT_KEY -> + argFutures.put(entry.getKey(), fetchKvEntry(ctx.getTenantId(), resolveEntityId(entityId, entry), entry.getValue())); + default -> { + var zoneGroupConfiguration = zoneGroupConfigs.get(entry.getKey()); + var resolvedEntityIdsFuture = resolveGeofencingEntityIds(ctx.getTenantId(), entityId, entry); + argFutures.put(entry.getKey(), Futures.transformAsync(resolvedEntityIdsFuture, resolvedEntityIds -> + fetchGeofencingKvEntry(ctx.getTenantId(), resolvedEntityIds, entry.getValue(), zoneGroupConfiguration), calculatedFieldCallbackExecutor)); + } + } + } + } + @Override public Map fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map arguments) { Map> argFutures = new HashMap<>(); @@ -180,6 +192,10 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP var argValueFuture = fetchKvEntry(tenantId, argEntityId, entry.getValue()); argFutures.put(entry.getKey(), argValueFuture); } + return resolveArgumentFutures(argFutures); + } + + private Map resolveArgumentFutures(Map> argFutures) { return argFutures.entrySet().stream() .collect(Collectors.toMap( Entry::getKey, // Keep the key as is diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index eb87d375c5..fa7e628ab3 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -35,13 +35,15 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { protected long latestTimestamp = -1; + private boolean dirty; + public BaseCalculatedFieldState(List requiredArguments) { this.requiredArguments = requiredArguments; this.arguments = new HashMap<>(); } public BaseCalculatedFieldState() { - this(new ArrayList<>(), new HashMap<>(), false, -1); + this(new ArrayList<>(), new HashMap<>(), false, -1, false); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java index 77e630baaa..bb7515d7f5 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java @@ -47,6 +47,10 @@ public interface CalculatedFieldState { long getLatestTimestamp(); + void setDirty(boolean dirty); + + boolean isDirty(); + void setRequiredArguments(List requiredArguments); boolean updateState(CalculatedFieldCtx ctx, Map argumentValues); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldState.java index f66c4cf9c3..a98d6b65b1 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldState.java @@ -46,13 +46,14 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState { private List requiredArguments; private Map arguments; - - protected boolean sizeExceedsLimit; + private boolean sizeExceedsLimit; private long latestTimestamp = -1; + private boolean dirty; + public GeofencingCalculatedFieldState() { - this(new ArrayList<>(), new HashMap<>(), false, -1); + this(new ArrayList<>(), new HashMap<>(), false, -1, false); } public GeofencingCalculatedFieldState(List argNames) { diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index bfcd3f3071..fc0e2262bb 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -152,8 +152,8 @@ public enum MsgType { CF_ENTITY_INIT_CF_MSG, CF_ENTITY_DELETE_MSG, - CF_SCHEDULED_CHECK_FOR_UPDATES_MSG, - CF_ENTITY_CHECK_FOR_UPDATES_MSG; + CF_SCHEDULED_INVALIDATION_MSG, + CF_ENTITY_MARK_STATE_DIRTY_MSG; @Getter private final boolean ignoreOnStart;