Added dirty updates support

This commit is contained in:
dshvaika 2025-08-08 15:14:03 +03:00
parent 589e159b54
commit 71f092c4e8
12 changed files with 111 additions and 90 deletions

View File

@ -75,8 +75,8 @@ public class CalculatedFieldEntityActor extends AbstractCalculatedFieldActor {
case CF_LINKED_TELEMETRY_MSG:
processor.process((EntityCalculatedFieldLinkedTelemetryMsg) msg);
break;
case CF_ENTITY_CHECK_FOR_UPDATES_MSG:
processor.process((EntityCalculatedFieldCheckForUpdatesMsg) msg);
case CF_ENTITY_MARK_STATE_DIRTY_MSG:
processor.process((EntityCalculatedFieldMarkStateDirtyMsg) msg);
break;
default:
return false;

View File

@ -228,29 +228,16 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
}
public void process(EntityCalculatedFieldCheckForUpdatesMsg msg) throws CalculatedFieldException {
CalculatedFieldCtx cfCtx = msg.getCfCtx();
CalculatedFieldId cfId = cfCtx.getCfId();
log.debug("[{}][{}] Processing CF check for updates msg.", entityId, cfId);
CalculatedFieldState currentState = states.get(cfId);
try {
var stateFromDb = getStateFromDb(cfCtx);
if (currentState.equals(stateFromDb)) {
log.debug("[{}][{}] CF state is up-to-date.", entityId, cfId);
return;
}
states.put(cfId, stateFromDb);
if (stateFromDb.isSizeOk()) {
processStateIfReady(cfCtx, Collections.singletonList(cfId), stateFromDb, null, null, msg.getCallback());
public void process(EntityCalculatedFieldMarkStateDirtyMsg msg) throws CalculatedFieldException {
log.debug("[{}][{}] Processing entity CF invalidation msg.", entityId, msg.getCfId());
CalculatedFieldState currentState = states.get(msg.getCfId());
if (currentState == null) {
log.debug("[{}][{}] Failed to find CF state for entity.", entityId, msg.getCfId());
} else {
throw new RuntimeException(cfCtx.getSizeExceedsLimitMessage());
}
} catch (Exception e) {
if (e instanceof CalculatedFieldException cfe) {
throw cfe;
}
throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(entityId).cause(e).build();
currentState.setDirty(true);
log.debug("[{}][{}] CF state marked as dirty.", entityId, msg.getCfId());
}
msg.getCallback().onSuccess();
}
private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
@ -280,6 +267,15 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
if (state == null) {
state = getOrInitState(ctx);
justRestored = true;
} else if (state.isDirty()) {
log.debug("[{}][{}] Going to update dirty CF state.", entityId, ctx.getCfId());
try {
Map<String, ArgumentEntry> dynamicArgsFromDb = cfService.fetchDynamicArgsFromDb(ctx, entityId);
dynamicArgsFromDb.forEach(newArgValues::putIfAbsent);
state.setDirty(false);
} catch (Exception e) {
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
}
}
if (state.isSizeOk()) {
if (state.updateState(ctx, newArgValues) || justRestored) {

View File

@ -91,8 +91,8 @@ public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor {
case CF_LINKED_TELEMETRY_MSG:
processor.onLinkedTelemetryMsg((CalculatedFieldLinkedTelemetryMsg) msg);
break;
case CF_SCHEDULED_CHECK_FOR_UPDATES_MSG:
processor.onScheduledCheckForUpdatesMsg((CalculatedFieldScheduledCheckForUpdatesMsg) msg);
case CF_SCHEDULED_INVALIDATION_MSG:
processor.onScheduledInvalidationMsg((CalculatedFieldScheduledInvalidationMsg) msg);
break;
default:
return false;

View File

@ -76,7 +76,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
private final Map<CalculatedFieldId, CalculatedFieldCtx> calculatedFields = new HashMap<>();
private final Map<EntityId, List<CalculatedFieldCtx>> entityIdCalculatedFields = new HashMap<>();
private final Map<EntityId, List<CalculatedFieldLink>> entityIdCalculatedFieldLinks = new HashMap<>();
private final Map<CalculatedFieldId, ScheduledFuture<?>> checkForCalculatedFieldUpdateTasks = new ConcurrentHashMap<>();
private final Map<CalculatedFieldId, ScheduledFuture<?>> cfInvalidationScheduledTasks = new ConcurrentHashMap<>();
private final CalculatedFieldProcessingService cfExecService;
private final CalculatedFieldStateService cfStateService;
@ -115,8 +115,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
calculatedFields.clear();
entityIdCalculatedFields.clear();
entityIdCalculatedFieldLinks.clear();
checkForCalculatedFieldUpdateTasks.values().forEach(future -> future.cancel(true));
checkForCalculatedFieldUpdateTasks.clear();
cfInvalidationScheduledTasks.values().forEach(future -> future.cancel(true));
cfInvalidationScheduledTasks.clear();
ctx.stop(ctx.getSelf());
}
@ -147,7 +147,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
// We use copy on write lists to safely pass the reference to another actor for the iteration.
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx);
scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx);
scheduleCalculatedFieldInvalidationMsgIfNeeded(cfCtx);
msg.getCallback().onSuccess();
}
@ -336,7 +336,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
boolean hasSchedulingConfigChanges = newCfCtx.hasSchedulingConfigChanges(oldCfCtx);
if (hasSchedulingConfigChanges) {
cancelCfUpdateTaskIfExists(cfId, false);
cancelCfScheduledInvalidationTaskIfExists(cfId, false);
}
List<CalculatedFieldCtx> newCfList = new CopyOnWriteArrayList<>();
@ -379,7 +379,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx);
deleteLinks(cfCtx);
cancelCfUpdateTaskIfExists(cfId, true);
cancelCfScheduledInvalidationTaskIfExists(cfId, true);
EntityId entityId = cfCtx.getEntityId();
EntityType entityType = cfCtx.getEntityId().getEntityType();
@ -404,12 +404,12 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
}
private void cancelCfUpdateTaskIfExists(CalculatedFieldId cfId, boolean cfDeleted) {
var existingTask = checkForCalculatedFieldUpdateTasks.remove(cfId);
private void cancelCfScheduledInvalidationTaskIfExists(CalculatedFieldId cfId, boolean cfDeleted) {
var existingTask = cfInvalidationScheduledTasks.remove(cfId);
if (existingTask != null) {
existingTask.cancel(false);
String reason = cfDeleted ? "removal" : "update";
log.debug("[{}][{}] Cancelled check for update task due to CF " + reason + "!", tenantId, cfId);
String reason = cfDeleted ? "deletion" : "update";
log.debug("[{}][{}] Cancelled scheduled invalidation task due to CF " + reason + "!", tenantId, cfId);
}
}
@ -515,7 +515,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
private void initCf(CalculatedFieldCtx cfCtx, TbCallback callback, boolean forceStateReinit) {
EntityId entityId = cfCtx.getEntityId();
EntityType entityType = cfCtx.getEntityId().getEntityType();
scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx);
scheduleCalculatedFieldInvalidationMsgIfNeeded(cfCtx);
if (isProfileEntity(entityType)) {
var entityIds = entityProfileCache.getEntityIdsByProfileId(entityId);
if (!entityIds.isEmpty()) {
@ -533,31 +533,31 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
}
private void scheduleCalculatedFieldUpdateMsgIfNeeded(CalculatedFieldCtx cfCtx) {
private void scheduleCalculatedFieldInvalidationMsgIfNeeded(CalculatedFieldCtx cfCtx) {
CalculatedField cf = cfCtx.getCalculatedField();
CalculatedFieldConfiguration cfConfig = cf.getConfiguration();
if (!cfConfig.isScheduledUpdateEnabled()) {
return;
}
if (checkForCalculatedFieldUpdateTasks.containsKey(cf.getId())) {
log.debug("[{}][{}] Check for update msg for CF is already scheduled!", tenantId, cf.getId());
if (cfInvalidationScheduledTasks.containsKey(cf.getId())) {
log.debug("[{}][{}] Scheduled invalidation task for CF already exists!", tenantId, cf.getId());
return;
}
long refreshDynamicSourceInterval = TimeUnit.SECONDS.toMillis(cfConfig.getScheduledUpdateIntervalSec());
var scheduledMsg = new CalculatedFieldScheduledCheckForUpdatesMsg(tenantId, cfCtx.getCfId());
var scheduledMsg = new CalculatedFieldScheduledInvalidationMsg(tenantId, cfCtx.getCfId());
ScheduledFuture<?> scheduledFuture = systemContext
.schedulePeriodicMsgWithDelay(ctx, scheduledMsg, refreshDynamicSourceInterval, refreshDynamicSourceInterval);
checkForCalculatedFieldUpdateTasks.put(cf.getId(), scheduledFuture);
log.debug("[{}][{}] Scheduled check for update msg for CF!", tenantId, cf.getId());
cfInvalidationScheduledTasks.put(cf.getId(), scheduledFuture);
log.debug("[{}][{}] Scheduled invalidation task for CF!", tenantId, cf.getId());
}
public void onScheduledCheckForUpdatesMsg(CalculatedFieldScheduledCheckForUpdatesMsg msg) {
log.debug("[{}] [{}] Processing CF scheduled update msg.", tenantId, msg.getCfId());
public void onScheduledInvalidationMsg(CalculatedFieldScheduledInvalidationMsg msg) {
log.debug("[{}] [{}] Processing CF scheduled invalidation msg.", tenantId, msg.getCfId());
CalculatedFieldCtx cfCtx = calculatedFields.get(msg.getCfId());
if (cfCtx == null) {
log.debug("[{}][{}] Failed to find CF context, going to stop scheduler updates.", tenantId, msg.getCfId());
cancelCfUpdateTaskIfExists(msg.getCfId(), true);
log.debug("[{}][{}] Failed to find CF context, going to stop scheduled invalidations for CF.", tenantId, msg.getCfId());
cancelCfScheduledInvalidationTaskIfExists(msg.getCfId(), true);
return;
}
EntityId entityId = cfCtx.getEntityId();
@ -568,7 +568,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
var multiCallback = new MultipleTbCallback(entityIds.size(), msg.getCallback());
entityIds.forEach(id -> {
if (isMyPartition(id, multiCallback)) {
updateCfWithDynamicSourceForEntity(id, cfCtx, multiCallback);
InitCfInvalidationForEntity(id, msg.getCfId(), multiCallback);
}
});
} else {
@ -576,14 +576,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
} else {
if (isMyPartition(entityId, msg.getCallback())) {
updateCfWithDynamicSourceForEntity(entityId, cfCtx, msg.getCallback());
InitCfInvalidationForEntity(entityId, msg.getCfId(), msg.getCallback());
}
}
}
private void updateCfWithDynamicSourceForEntity(EntityId entityId, CalculatedFieldCtx cfCtx, TbCallback callback) {
log.debug("Pushing entity dynamic source refresh CF msg to specific actor [{}]", entityId);
getOrCreateActor(entityId).tell(new EntityCalculatedFieldCheckForUpdatesMsg(tenantId, cfCtx, callback));
private void InitCfInvalidationForEntity(EntityId entityId, CalculatedFieldId cfId, TbCallback callback) {
log.debug("Pushing entity CF invalidation msg to specific actor [{}]", entityId);
getOrCreateActor(entityId).tell(new EntityCalculatedFieldMarkStateDirtyMsg(tenantId, cfId, callback));
}
private void deleteCfForEntity(EntityId entityId, CalculatedFieldId cfId, TbCallback callback) {

View File

@ -22,14 +22,14 @@ import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
@Data
public class CalculatedFieldScheduledCheckForUpdatesMsg implements ToCalculatedFieldSystemMsg {
public class CalculatedFieldScheduledInvalidationMsg implements ToCalculatedFieldSystemMsg {
private final TenantId tenantId;
private final CalculatedFieldId cfId;
@Override
public MsgType getMsgType() {
return MsgType.CF_SCHEDULED_CHECK_FOR_UPDATES_MSG;
return MsgType.CF_SCHEDULED_INVALIDATION_MSG;
}
}

View File

@ -16,22 +16,22 @@
package org.thingsboard.server.actors.calculatedField;
import lombok.Data;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
@Data
public class EntityCalculatedFieldCheckForUpdatesMsg implements ToCalculatedFieldSystemMsg {
public class EntityCalculatedFieldMarkStateDirtyMsg implements ToCalculatedFieldSystemMsg {
private final TenantId tenantId;
private final CalculatedFieldCtx cfCtx;
private final CalculatedFieldId cfId;
private final TbCallback callback;
@Override
public MsgType getMsgType() {
return MsgType.CF_ENTITY_CHECK_FOR_UPDATES_MSG;
return MsgType.CF_ENTITY_MARK_STATE_DIRTY_MSG;
}
}

View File

@ -34,6 +34,8 @@ public interface CalculatedFieldProcessingService {
ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId);
Map<String, ArgumentEntry> fetchDynamicArgsFromDb(CalculatedFieldCtx ctx, EntityId entityId);
Map<String, ArgumentEntry> fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map<String, Argument> arguments);
void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List<CalculatedFieldId> cfIds, TbCallback callback);

View File

@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.CFArgumentDynamicSourceType;
import org.thingsboard.server.common.data.cf.configuration.GeofencingCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.GeofencingZoneGroupConfiguration;
import org.thingsboard.server.common.data.cf.configuration.OutputType;
@ -90,6 +91,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@ -132,9 +134,43 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
Map<String, ListenableFuture<ArgumentEntry>> argFutures = new HashMap<>();
if (ctx.getCalculatedField().getType().equals(CalculatedFieldType.GEOFENCING)) {
fetchGeofencingCalculatedFieldArguments(ctx, entityId, argFutures, false);
} else {
for (var entry : ctx.getArguments().entrySet()) {
var argEntityId = resolveEntityId(entityId, entry);
var argValueFuture = fetchKvEntry(ctx.getTenantId(), argEntityId, entry.getValue());
argFutures.put(entry.getKey(), argValueFuture);
}
}
return Futures.whenAllComplete(argFutures.values()).call(() -> {
var result = createStateByType(ctx);
result.updateState(ctx, resolveArgumentFutures(argFutures));
return result;
}, calculatedFieldCallbackExecutor);
}
@Override
public Map<String, ArgumentEntry> fetchDynamicArgsFromDb(CalculatedFieldCtx ctx, EntityId entityId) {
// only geofencing calculated fields supports dynamic arguments scheduled updates
if (!ctx.getCalculatedField().getType().equals(CalculatedFieldType.GEOFENCING)) {
return Map.of();
}
Map<String, ListenableFuture<ArgumentEntry>> argFutures = new HashMap<>();
fetchGeofencingCalculatedFieldArguments(ctx, entityId, argFutures, true);
return resolveArgumentFutures(argFutures);
}
private void fetchGeofencingCalculatedFieldArguments(CalculatedFieldCtx ctx, EntityId entityId, Map<String, ListenableFuture<ArgumentEntry>> argFutures, boolean dynamicArgumentsOnly) {
var configuration = (GeofencingCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration();
var zoneGroupConfigs = configuration.getGeofencingZoneGroupConfigurations();
for (var entry : ctx.getArguments().entrySet()) {
Set<Entry<String, Argument>> entries = ctx.getArguments().entrySet();
if (dynamicArgumentsOnly) {
entries = entries.stream()
.filter(entry -> CFArgumentDynamicSourceType.RELATION_QUERY.equals(entry.getValue().getRefDynamicSource()))
.collect(Collectors.toSet());
}
for (var entry : entries) {
switch (entry.getKey()) {
case ENTITY_ID_LATITUDE_ARGUMENT_KEY, ENTITY_ID_LONGITUDE_ARGUMENT_KEY ->
argFutures.put(entry.getKey(), fetchKvEntry(ctx.getTenantId(), resolveEntityId(entityId, entry), entry.getValue()));
@ -146,30 +182,6 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
}
}
}
} else {
for (var entry : ctx.getArguments().entrySet()) {
var argEntityId = resolveEntityId(entityId, entry);
var argValueFuture = fetchKvEntry(ctx.getTenantId(), argEntityId, entry.getValue());
argFutures.put(entry.getKey(), argValueFuture);
}
}
return Futures.whenAllComplete(argFutures.values()).call(() -> {
var result = createStateByType(ctx);
result.updateState(ctx, argFutures.entrySet().stream()
.collect(Collectors.toMap(
Entry::getKey, // Keep the key as is
entry -> {
try {
// Resolve the future to get the value
return entry.getValue().get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Error getting future result for key: " + entry.getKey(), e);
}
}
)));
return result;
}, calculatedFieldCallbackExecutor);
}
@Override
@ -180,6 +192,10 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
var argValueFuture = fetchKvEntry(tenantId, argEntityId, entry.getValue());
argFutures.put(entry.getKey(), argValueFuture);
}
return resolveArgumentFutures(argFutures);
}
private Map<String, ArgumentEntry> resolveArgumentFutures(Map<String, ListenableFuture<ArgumentEntry>> argFutures) {
return argFutures.entrySet().stream()
.collect(Collectors.toMap(
Entry::getKey, // Keep the key as is

View File

@ -35,13 +35,15 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
protected long latestTimestamp = -1;
private boolean dirty;
public BaseCalculatedFieldState(List<String> requiredArguments) {
this.requiredArguments = requiredArguments;
this.arguments = new HashMap<>();
}
public BaseCalculatedFieldState() {
this(new ArrayList<>(), new HashMap<>(), false, -1);
this(new ArrayList<>(), new HashMap<>(), false, -1, false);
}
@Override

View File

@ -47,6 +47,10 @@ public interface CalculatedFieldState {
long getLatestTimestamp();
void setDirty(boolean dirty);
boolean isDirty();
void setRequiredArguments(List<String> requiredArguments);
boolean updateState(CalculatedFieldCtx ctx, Map<String, ArgumentEntry> argumentValues);

View File

@ -46,13 +46,14 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState {
private List<String> requiredArguments;
private Map<String, ArgumentEntry> arguments;
protected boolean sizeExceedsLimit;
private boolean sizeExceedsLimit;
private long latestTimestamp = -1;
private boolean dirty;
public GeofencingCalculatedFieldState() {
this(new ArrayList<>(), new HashMap<>(), false, -1);
this(new ArrayList<>(), new HashMap<>(), false, -1, false);
}
public GeofencingCalculatedFieldState(List<String> argNames) {

View File

@ -152,8 +152,8 @@ public enum MsgType {
CF_ENTITY_INIT_CF_MSG,
CF_ENTITY_DELETE_MSG,
CF_SCHEDULED_CHECK_FOR_UPDATES_MSG,
CF_ENTITY_CHECK_FOR_UPDATES_MSG;
CF_SCHEDULED_INVALIDATION_MSG,
CF_ENTITY_MARK_STATE_DIRTY_MSG;
@Getter
private final boolean ignoreOnStart;