Scheduling exclusively during CF init and update & added simple relation check for fetch from DB
This commit is contained in:
		
							parent
							
								
									c8490080a1
								
							
						
					
					
						commit
						e22462521f
					
				@ -231,7 +231,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
 | 
				
			|||||||
    public void process(EntityCalculatedFieldCheckForUpdatesMsg msg) throws CalculatedFieldException {
 | 
					    public void process(EntityCalculatedFieldCheckForUpdatesMsg msg) throws CalculatedFieldException {
 | 
				
			||||||
        CalculatedFieldCtx cfCtx = msg.getCfCtx();
 | 
					        CalculatedFieldCtx cfCtx = msg.getCfCtx();
 | 
				
			||||||
        CalculatedFieldId cfId = cfCtx.getCfId();
 | 
					        CalculatedFieldId cfId = cfCtx.getCfId();
 | 
				
			||||||
        log.debug("[{}] [{}] Processing CF dynamic sources refresh msg.", entityId, cfId);
 | 
					        log.debug("[{}][{}] Processing CF check for updates msg.", entityId, cfId);
 | 
				
			||||||
        try {
 | 
					        try {
 | 
				
			||||||
            var state = updateStateFromDb(cfCtx);
 | 
					            var state = updateStateFromDb(cfCtx);
 | 
				
			||||||
            if (state.isSizeOk()) {
 | 
					            if (state.isSizeOk()) {
 | 
				
			||||||
 | 
				
			|||||||
@ -334,7 +334,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
 | 
				
			|||||||
                calculatedFields.put(newCf.getId(), newCfCtx);
 | 
					                calculatedFields.put(newCf.getId(), newCfCtx);
 | 
				
			||||||
                List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getEntityId());
 | 
					                List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getEntityId());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if (newCfCtx.hasSchedulingConfigChanges(oldCfCtx)) {
 | 
					                boolean hasSchedulingConfigChanges = newCfCtx.hasSchedulingConfigChanges(oldCfCtx);
 | 
				
			||||||
 | 
					                if (hasSchedulingConfigChanges) {
 | 
				
			||||||
                    cancelCfUpdateTaskIfExists(cfId, false);
 | 
					                    cancelCfUpdateTaskIfExists(cfId, false);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -359,7 +360,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
 | 
				
			|||||||
                // We use copy on write lists to safely pass the reference to another actor for the iteration.
 | 
					                // We use copy on write lists to safely pass the reference to another actor for the iteration.
 | 
				
			||||||
                // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
 | 
					                // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
 | 
				
			||||||
                var stateChanges = newCfCtx.hasStateChanges(oldCfCtx);
 | 
					                var stateChanges = newCfCtx.hasStateChanges(oldCfCtx);
 | 
				
			||||||
                if (stateChanges || newCfCtx.hasOtherSignificantChanges(oldCfCtx)) {
 | 
					                if (stateChanges || newCfCtx.hasOtherSignificantChanges(oldCfCtx) || hasSchedulingConfigChanges) {
 | 
				
			||||||
                    initCf(newCfCtx, callback, stateChanges);
 | 
					                    initCf(newCfCtx, callback, stateChanges);
 | 
				
			||||||
                } else {
 | 
					                } else {
 | 
				
			||||||
                    callback.onSuccess();
 | 
					                    callback.onSuccess();
 | 
				
			||||||
@ -514,6 +515,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
 | 
				
			|||||||
    private void initCf(CalculatedFieldCtx cfCtx, TbCallback callback, boolean forceStateReinit) {
 | 
					    private void initCf(CalculatedFieldCtx cfCtx, TbCallback callback, boolean forceStateReinit) {
 | 
				
			||||||
        EntityId entityId = cfCtx.getEntityId();
 | 
					        EntityId entityId = cfCtx.getEntityId();
 | 
				
			||||||
        EntityType entityType = cfCtx.getEntityId().getEntityType();
 | 
					        EntityType entityType = cfCtx.getEntityId().getEntityType();
 | 
				
			||||||
 | 
					        scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx);
 | 
				
			||||||
        if (isProfileEntity(entityType)) {
 | 
					        if (isProfileEntity(entityType)) {
 | 
				
			||||||
            var entityIds = entityProfileCache.getEntityIdsByProfileId(entityId);
 | 
					            var entityIds = entityProfileCache.getEntityIdsByProfileId(entityId);
 | 
				
			||||||
            if (!entityIds.isEmpty()) {
 | 
					            if (!entityIds.isEmpty()) {
 | 
				
			||||||
@ -523,29 +525,25 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
 | 
				
			|||||||
                        initCfForEntity(id, cfCtx, forceStateReinit, multiCallback);
 | 
					                        initCfForEntity(id, cfCtx, forceStateReinit, multiCallback);
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                });
 | 
					                });
 | 
				
			||||||
                scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx);
 | 
					 | 
				
			||||||
            } else {
 | 
					            } else {
 | 
				
			||||||
                callback.onSuccess();
 | 
					                callback.onSuccess();
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        } else {
 | 
					        } else if (isMyPartition(entityId, callback)) {
 | 
				
			||||||
            if (isMyPartition(entityId, callback)) {
 | 
					 | 
				
			||||||
            initCfForEntity(entityId, cfCtx, forceStateReinit, callback);
 | 
					            initCfForEntity(entityId, cfCtx, forceStateReinit, callback);
 | 
				
			||||||
                scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx);
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void scheduleCalculatedFieldUpdateMsgIfNeeded(CalculatedFieldCtx cfCtx) {
 | 
					    private void scheduleCalculatedFieldUpdateMsgIfNeeded(CalculatedFieldCtx cfCtx) {
 | 
				
			||||||
        CalculatedField cf = cfCtx.getCalculatedField();
 | 
					        CalculatedField cf = cfCtx.getCalculatedField();
 | 
				
			||||||
        CalculatedFieldConfiguration cfConfig = cf.getConfiguration();
 | 
					        CalculatedFieldConfiguration cfConfig = cf.getConfiguration();
 | 
				
			||||||
        if (!cfConfig.isDynamicRefreshEnabled()) {
 | 
					        if (!cfConfig.isScheduledUpdateEnabled()) {
 | 
				
			||||||
            return;
 | 
					            return;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (checkForCalculatedFieldUpdateTasks.containsKey(cf.getId())) {
 | 
					        if (checkForCalculatedFieldUpdateTasks.containsKey(cf.getId())) {
 | 
				
			||||||
            log.debug("[{}][{}] Check for update msg for CF is already scheduled!", tenantId, cf.getId());
 | 
					            log.debug("[{}][{}] Check for update msg for CF is already scheduled!", tenantId, cf.getId());
 | 
				
			||||||
            return;
 | 
					            return;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        long refreshDynamicSourceInterval = TimeUnit.SECONDS.toMillis(cfConfig.getRefreshIntervalSec());
 | 
					        long refreshDynamicSourceInterval = TimeUnit.SECONDS.toMillis(cfConfig.getScheduledUpdateIntervalSec());
 | 
				
			||||||
        var scheduledMsg = new CalculatedFieldScheduledCheckForUpdatesMsg(tenantId, cfCtx);
 | 
					        var scheduledMsg = new CalculatedFieldScheduledCheckForUpdatesMsg(tenantId, cfCtx);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        ScheduledFuture<?> scheduledFuture = systemContext
 | 
					        ScheduledFuture<?> scheduledFuture = systemContext
 | 
				
			||||||
 | 
				
			|||||||
@ -52,6 +52,7 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 | 
				
			|||||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
 | 
					import org.thingsboard.server.common.data.kv.StringDataEntry;
 | 
				
			||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
					import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
				
			||||||
import org.thingsboard.server.common.data.msg.TbMsgType;
 | 
					import org.thingsboard.server.common.data.msg.TbMsgType;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.relation.RelationTypeGroup;
 | 
				
			||||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
 | 
					import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
 | 
				
			||||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
					import org.thingsboard.server.common.msg.TbMsg;
 | 
				
			||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
					import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
				
			||||||
@ -286,9 +287,19 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
        return switch (value.getRefDynamicSource()) {
 | 
					        return switch (value.getRefDynamicSource()) {
 | 
				
			||||||
            case RELATION_QUERY -> {
 | 
					            case RELATION_QUERY -> {
 | 
				
			||||||
                var relationQueryDynamicSourceConfiguration = (RelationQueryDynamicSourceConfiguration) value.getRefDynamicSourceConfiguration();
 | 
					                var configuration = (RelationQueryDynamicSourceConfiguration) value.getRefDynamicSourceConfiguration();
 | 
				
			||||||
                yield Futures.transform(relationService.findByQuery(tenantId, relationQueryDynamicSourceConfiguration.toEntityRelationsQuery(entityId)),
 | 
					                if (configuration.isSimpleRelation()) {
 | 
				
			||||||
                        relationQueryDynamicSourceConfiguration::resolveEntityIds, calculatedFieldCallbackExecutor);
 | 
					                    yield switch (configuration.getDirection()) {
 | 
				
			||||||
 | 
					                        case FROM ->
 | 
				
			||||||
 | 
					                                Futures.transform(relationService.findByFromAndTypeAsync(tenantId, entityId, configuration.getRelationType(), RelationTypeGroup.COMMON),
 | 
				
			||||||
 | 
					                                        configuration::resolveEntityIds, calculatedFieldCallbackExecutor);
 | 
				
			||||||
 | 
					                        case TO ->
 | 
				
			||||||
 | 
					                                Futures.transform(relationService.findByToAndTypeAsync(tenantId, entityId, configuration.getRelationType(), RelationTypeGroup.COMMON),
 | 
				
			||||||
 | 
					                                        configuration::resolveEntityIds, calculatedFieldCallbackExecutor);
 | 
				
			||||||
 | 
					                    };
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					                yield Futures.transform(relationService.findByQuery(tenantId, configuration.toEntityRelationsQuery(entityId)),
 | 
				
			||||||
 | 
					                        configuration::resolveEntityIds, calculatedFieldCallbackExecutor);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        };
 | 
					        };
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -315,8 +315,8 @@ public class CalculatedFieldCtx {
 | 
				
			|||||||
    public boolean hasSchedulingConfigChanges(CalculatedFieldCtx other) {
 | 
					    public boolean hasSchedulingConfigChanges(CalculatedFieldCtx other) {
 | 
				
			||||||
        CalculatedFieldConfiguration thisConfig = calculatedField.getConfiguration();
 | 
					        CalculatedFieldConfiguration thisConfig = calculatedField.getConfiguration();
 | 
				
			||||||
        CalculatedFieldConfiguration otherConfig = other.calculatedField.getConfiguration();
 | 
					        CalculatedFieldConfiguration otherConfig = other.calculatedField.getConfiguration();
 | 
				
			||||||
        boolean refreshTriggerChanged = thisConfig.isDynamicRefreshEnabled() != otherConfig.isDynamicRefreshEnabled();
 | 
					        boolean refreshTriggerChanged = thisConfig.isScheduledUpdateEnabled() != otherConfig.isScheduledUpdateEnabled();
 | 
				
			||||||
        boolean refreshIntervalChanged = thisConfig.getRefreshIntervalSec() != otherConfig.getRefreshIntervalSec();
 | 
					        boolean refreshIntervalChanged = thisConfig.getScheduledUpdateIntervalSec() != otherConfig.getScheduledUpdateIntervalSec();
 | 
				
			||||||
        return refreshTriggerChanged || refreshIntervalChanged;
 | 
					        return refreshTriggerChanged || refreshIntervalChanged;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -63,11 +63,11 @@ public interface CalculatedFieldConfiguration {
 | 
				
			|||||||
    boolean hasDynamicSourceArguments();
 | 
					    boolean hasDynamicSourceArguments();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @JsonIgnore
 | 
					    @JsonIgnore
 | 
				
			||||||
    default boolean isDynamicRefreshEnabled() {
 | 
					    default boolean isScheduledUpdateEnabled() {
 | 
				
			||||||
        return hasDynamicSourceArguments() && getRefreshIntervalSec() > 0;
 | 
					        return hasDynamicSourceArguments() && getScheduledUpdateIntervalSec() > 0;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    default int getRefreshIntervalSec() {
 | 
					    default int getScheduledUpdateIntervalSec() {
 | 
				
			||||||
        return 0;
 | 
					        return 0;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -19,6 +19,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 | 
				
			|||||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 | 
					import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 | 
				
			||||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
 | 
					import com.fasterxml.jackson.annotation.JsonSubTypes;
 | 
				
			||||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
 | 
					import com.fasterxml.jackson.annotation.JsonTypeInfo;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.id.EntityId;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@JsonTypeInfo(
 | 
					@JsonTypeInfo(
 | 
				
			||||||
        use = JsonTypeInfo.Id.NAME,
 | 
					        use = JsonTypeInfo.Id.NAME,
 | 
				
			||||||
@ -36,4 +38,10 @@ public interface CfArgumentDynamicSourceConfiguration {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    default void validate() {}
 | 
					    default void validate() {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @JsonIgnore
 | 
				
			||||||
 | 
					    boolean isSimpleRelation();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @JsonIgnore
 | 
				
			||||||
 | 
					    EntityRelationsQuery toEntityRelationsQuery(EntityId rootEntityId);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -30,7 +30,7 @@ public class GeofencingCalculatedFieldConfiguration extends BaseCalculatedFieldC
 | 
				
			|||||||
        return CalculatedFieldType.GEOFENCING;
 | 
					        return CalculatedFieldType.GEOFENCING;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public boolean isDynamicRefreshEnabled() {
 | 
					    public boolean isScheduledUpdateEnabled() {
 | 
				
			||||||
        return refreshIntervalSec > 0;
 | 
					        return refreshIntervalSec > 0;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -31,6 +31,7 @@ import java.util.List;
 | 
				
			|||||||
public class RelationQueryDynamicSourceConfiguration implements CfArgumentDynamicSourceConfiguration {
 | 
					public class RelationQueryDynamicSourceConfiguration implements CfArgumentDynamicSourceConfiguration {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private int maxLevel;
 | 
					    private int maxLevel;
 | 
				
			||||||
 | 
					    private boolean fetchLastLevelOnly;
 | 
				
			||||||
    private EntitySearchDirection direction;
 | 
					    private EntitySearchDirection direction;
 | 
				
			||||||
    private String relationType;
 | 
					    private String relationType;
 | 
				
			||||||
    private List<EntityType> profiles;
 | 
					    private List<EntityType> profiles;
 | 
				
			||||||
@ -40,9 +41,18 @@ public class RelationQueryDynamicSourceConfiguration implements CfArgumentDynami
 | 
				
			|||||||
        return CFArgumentDynamicSourceType.RELATION_QUERY;
 | 
					        return CFArgumentDynamicSourceType.RELATION_QUERY;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public boolean isSimpleRelation() {
 | 
				
			||||||
 | 
					        return maxLevel == 1 && (profiles == null || profiles.isEmpty());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
    public EntityRelationsQuery toEntityRelationsQuery(EntityId rootEntityId) {
 | 
					    public EntityRelationsQuery toEntityRelationsQuery(EntityId rootEntityId) {
 | 
				
			||||||
 | 
					        if (isSimpleRelation()) {
 | 
				
			||||||
 | 
					            throw new IllegalArgumentException("Entity relations query can't be created for a simple relation!");
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
        var entityRelationsQuery = new EntityRelationsQuery();
 | 
					        var entityRelationsQuery = new EntityRelationsQuery();
 | 
				
			||||||
        entityRelationsQuery.setParameters(new RelationsSearchParameters(rootEntityId, direction, maxLevel, false));
 | 
					        entityRelationsQuery.setParameters(new RelationsSearchParameters(rootEntityId, direction, maxLevel, fetchLastLevelOnly));
 | 
				
			||||||
        entityRelationsQuery.setFilters(Collections.singletonList(new RelationEntityTypeFilter(relationType, profiles)));
 | 
					        entityRelationsQuery.setFilters(Collections.singletonList(new RelationEntityTypeFilter(relationType, profiles)));
 | 
				
			||||||
        return entityRelationsQuery;
 | 
					        return entityRelationsQuery;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user