From e22462521faab76c1c5e115cb8f6c8cbebd7d236 Mon Sep 17 00:00:00 2001 From: dshvaika Date: Fri, 1 Aug 2025 15:31:43 +0300 Subject: [PATCH] Scheduling exclusively during CF init and update & added simple relation check for fetch from DB --- .../CalculatedFieldEntityMessageProcessor.java | 2 +- ...CalculatedFieldManagerMessageProcessor.java | 18 ++++++++---------- ...efaultCalculatedFieldProcessingService.java | 17 ++++++++++++++--- .../cf/ctx/state/CalculatedFieldCtx.java | 4 ++-- .../CalculatedFieldConfiguration.java | 6 +++--- .../CfArgumentDynamicSourceConfiguration.java | 8 ++++++++ ...GeofencingCalculatedFieldConfiguration.java | 2 +- ...elationQueryDynamicSourceConfiguration.java | 12 +++++++++++- 8 files changed, 48 insertions(+), 21 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 68c0471cb7..d41feac542 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -231,7 +231,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM public void process(EntityCalculatedFieldCheckForUpdatesMsg msg) throws CalculatedFieldException { CalculatedFieldCtx cfCtx = msg.getCfCtx(); CalculatedFieldId cfId = cfCtx.getCfId(); - log.debug("[{}] [{}] Processing CF dynamic sources refresh msg.", entityId, cfId); + log.debug("[{}][{}] Processing CF check for updates msg.", entityId, cfId); try { var state = updateStateFromDb(cfCtx); if (state.isSizeOk()) { diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 03de43d08b..9f102c893f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -334,7 +334,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware calculatedFields.put(newCf.getId(), newCfCtx); List oldCfList = entityIdCalculatedFields.get(newCf.getEntityId()); - if (newCfCtx.hasSchedulingConfigChanges(oldCfCtx)) { + boolean hasSchedulingConfigChanges = newCfCtx.hasSchedulingConfigChanges(oldCfCtx); + if (hasSchedulingConfigChanges) { cancelCfUpdateTaskIfExists(cfId, false); } @@ -359,7 +360,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware // We use copy on write lists to safely pass the reference to another actor for the iteration. // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead) var stateChanges = newCfCtx.hasStateChanges(oldCfCtx); - if (stateChanges || newCfCtx.hasOtherSignificantChanges(oldCfCtx)) { + if (stateChanges || newCfCtx.hasOtherSignificantChanges(oldCfCtx) || hasSchedulingConfigChanges) { initCf(newCfCtx, callback, stateChanges); } else { callback.onSuccess(); @@ -514,6 +515,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private void initCf(CalculatedFieldCtx cfCtx, TbCallback callback, boolean forceStateReinit) { EntityId entityId = cfCtx.getEntityId(); EntityType entityType = cfCtx.getEntityId().getEntityType(); + scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx); if (isProfileEntity(entityType)) { var entityIds = entityProfileCache.getEntityIdsByProfileId(entityId); if (!entityIds.isEmpty()) { @@ -523,29 +525,25 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware initCfForEntity(id, cfCtx, forceStateReinit, multiCallback); } }); - scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx); } else { callback.onSuccess(); } - } else { - if (isMyPartition(entityId, callback)) { - initCfForEntity(entityId, cfCtx, forceStateReinit, callback); - scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx); - } + } else if (isMyPartition(entityId, callback)) { + initCfForEntity(entityId, cfCtx, forceStateReinit, callback); } } private void scheduleCalculatedFieldUpdateMsgIfNeeded(CalculatedFieldCtx cfCtx) { CalculatedField cf = cfCtx.getCalculatedField(); CalculatedFieldConfiguration cfConfig = cf.getConfiguration(); - if (!cfConfig.isDynamicRefreshEnabled()) { + if (!cfConfig.isScheduledUpdateEnabled()) { return; } if (checkForCalculatedFieldUpdateTasks.containsKey(cf.getId())) { log.debug("[{}][{}] Check for update msg for CF is already scheduled!", tenantId, cf.getId()); return; } - long refreshDynamicSourceInterval = TimeUnit.SECONDS.toMillis(cfConfig.getRefreshIntervalSec()); + long refreshDynamicSourceInterval = TimeUnit.SECONDS.toMillis(cfConfig.getScheduledUpdateIntervalSec()); var scheduledMsg = new CalculatedFieldScheduledCheckForUpdatesMsg(tenantId, cfCtx); ScheduledFuture scheduledFuture = systemContext diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index 424df50009..44544fbbdd 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -52,6 +52,7 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -286,9 +287,19 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP } return switch (value.getRefDynamicSource()) { case RELATION_QUERY -> { - var relationQueryDynamicSourceConfiguration = (RelationQueryDynamicSourceConfiguration) value.getRefDynamicSourceConfiguration(); - yield Futures.transform(relationService.findByQuery(tenantId, relationQueryDynamicSourceConfiguration.toEntityRelationsQuery(entityId)), - relationQueryDynamicSourceConfiguration::resolveEntityIds, calculatedFieldCallbackExecutor); + var configuration = (RelationQueryDynamicSourceConfiguration) value.getRefDynamicSourceConfiguration(); + if (configuration.isSimpleRelation()) { + yield switch (configuration.getDirection()) { + case FROM -> + Futures.transform(relationService.findByFromAndTypeAsync(tenantId, entityId, configuration.getRelationType(), RelationTypeGroup.COMMON), + configuration::resolveEntityIds, calculatedFieldCallbackExecutor); + case TO -> + Futures.transform(relationService.findByToAndTypeAsync(tenantId, entityId, configuration.getRelationType(), RelationTypeGroup.COMMON), + configuration::resolveEntityIds, calculatedFieldCallbackExecutor); + }; + } + yield Futures.transform(relationService.findByQuery(tenantId, configuration.toEntityRelationsQuery(entityId)), + configuration::resolveEntityIds, calculatedFieldCallbackExecutor); } }; } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 8124d78d3a..0fa653cf67 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -315,8 +315,8 @@ public class CalculatedFieldCtx { public boolean hasSchedulingConfigChanges(CalculatedFieldCtx other) { CalculatedFieldConfiguration thisConfig = calculatedField.getConfiguration(); CalculatedFieldConfiguration otherConfig = other.calculatedField.getConfiguration(); - boolean refreshTriggerChanged = thisConfig.isDynamicRefreshEnabled() != otherConfig.isDynamicRefreshEnabled(); - boolean refreshIntervalChanged = thisConfig.getRefreshIntervalSec() != otherConfig.getRefreshIntervalSec(); + boolean refreshTriggerChanged = thisConfig.isScheduledUpdateEnabled() != otherConfig.isScheduledUpdateEnabled(); + boolean refreshIntervalChanged = thisConfig.getScheduledUpdateIntervalSec() != otherConfig.getScheduledUpdateIntervalSec(); return refreshTriggerChanged || refreshIntervalChanged; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java index d090feeca7..3ee55f5d66 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java @@ -63,11 +63,11 @@ public interface CalculatedFieldConfiguration { boolean hasDynamicSourceArguments(); @JsonIgnore - default boolean isDynamicRefreshEnabled() { - return hasDynamicSourceArguments() && getRefreshIntervalSec() > 0; + default boolean isScheduledUpdateEnabled() { + return hasDynamicSourceArguments() && getScheduledUpdateIntervalSec() > 0; } - default int getRefreshIntervalSec() { + default int getScheduledUpdateIntervalSec() { return 0; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CfArgumentDynamicSourceConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CfArgumentDynamicSourceConfiguration.java index 3fe432917b..7af6283536 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CfArgumentDynamicSourceConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CfArgumentDynamicSourceConfiguration.java @@ -19,6 +19,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.relation.EntityRelationsQuery; @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, @@ -36,4 +38,10 @@ public interface CfArgumentDynamicSourceConfiguration { default void validate() {} + @JsonIgnore + boolean isSimpleRelation(); + + @JsonIgnore + EntityRelationsQuery toEntityRelationsQuery(EntityId rootEntityId); + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/GeofencingCalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/GeofencingCalculatedFieldConfiguration.java index db6a5fd460..77369e6daa 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/GeofencingCalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/GeofencingCalculatedFieldConfiguration.java @@ -30,7 +30,7 @@ public class GeofencingCalculatedFieldConfiguration extends BaseCalculatedFieldC return CalculatedFieldType.GEOFENCING; } - public boolean isDynamicRefreshEnabled() { + public boolean isScheduledUpdateEnabled() { return refreshIntervalSec > 0; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/RelationQueryDynamicSourceConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/RelationQueryDynamicSourceConfiguration.java index 219fd068cd..d3500606f6 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/RelationQueryDynamicSourceConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/RelationQueryDynamicSourceConfiguration.java @@ -31,6 +31,7 @@ import java.util.List; public class RelationQueryDynamicSourceConfiguration implements CfArgumentDynamicSourceConfiguration { private int maxLevel; + private boolean fetchLastLevelOnly; private EntitySearchDirection direction; private String relationType; private List profiles; @@ -40,9 +41,18 @@ public class RelationQueryDynamicSourceConfiguration implements CfArgumentDynami return CFArgumentDynamicSourceType.RELATION_QUERY; } + @Override + public boolean isSimpleRelation() { + return maxLevel == 1 && (profiles == null || profiles.isEmpty()); + } + + @Override public EntityRelationsQuery toEntityRelationsQuery(EntityId rootEntityId) { + if (isSimpleRelation()) { + throw new IllegalArgumentException("Entity relations query can't be created for a simple relation!"); + } var entityRelationsQuery = new EntityRelationsQuery(); - entityRelationsQuery.setParameters(new RelationsSearchParameters(rootEntityId, direction, maxLevel, false)); + entityRelationsQuery.setParameters(new RelationsSearchParameters(rootEntityId, direction, maxLevel, fetchLastLevelOnly)); entityRelationsQuery.setFilters(Collections.singletonList(new RelationEntityTypeFilter(relationType, profiles))); return entityRelationsQuery; }