diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index c1a86ac36f..7e5ea5b5ec 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.cf; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -63,6 +64,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.asset.AssetService; @@ -173,13 +175,66 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas @Override protected Map>> onAddedPartitions(Set addedPartitions) { - // TODO: implementation for cluster mode - return Map.of(); + var result = new HashMap>>(); + PageDataIterable cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize); + Map> tpiCalculatedFieldMap = new HashMap<>(); + + for (CalculatedField cf : cfs) { + TopicPartitionInfo tpi; + try { + tpi = partitionService.resolve(ServiceType.TB_CORE, cf.getTenantId(), cf.getId()); + } catch (Exception e) { + log.warn("Failed to resolve partition for CalculatedField [{}], tenant [{}]. Reason: {}", + cf.getId(), cf.getTenantId(), e.getMessage()); + continue; + } + if (addedPartitions.contains(tpi) && states.keySet().stream().noneMatch(ctxId -> ctxId.cfId().equals(cf.getId().getId()))) { + tpiCalculatedFieldMap.computeIfAbsent(tpi, k -> new ArrayList<>()).add(cf); + } + } + + for (var entry : tpiCalculatedFieldMap.entrySet()) { + for (List partition : Lists.partition(entry.getValue(), 1000)) { + log.info("[{}] Submit task for CalculatedFields: {}", entry.getKey(), partition.size()); + var future = calculatedFieldExecutor.submit(() -> { + try { + for (CalculatedField cf : partition) { + if (EntityType.ASSET_PROFILE.equals(cf.getEntityId().getEntityType()) || EntityType.DEVICE_PROFILE.equals(cf.getEntityId().getEntityType())) { + getOrFetchFromDBProfileEntities(cf.getTenantId(), cf.getEntityId()) + .forEach(entityId -> restoreState(cf, entityId)); + } else { + restoreState(cf, cf.getEntityId()); + } + } + } catch (Throwable t) { + log.error("Unexpected exception while restoring CalculatedField states", t); + throw t; + } + }); + result.computeIfAbsent(entry.getKey(), k -> new ArrayList<>()).add(future); + } + } + return result; + } + + private void restoreState(CalculatedField cf, EntityId entityId) { + CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cf.getId().getId(), entityId.getId()); + String storedState = rocksDBService.get(JacksonUtil.writeValueAsString(ctxId)); + + if (storedState != null) { + CalculatedFieldEntityCtx restoredCtx = JacksonUtil.fromString(storedState, CalculatedFieldEntityCtx.class); + calculatedFieldsCtx.putIfAbsent(cf.getId(), new CalculatedFieldCtx(cf, tbelInvokeService)); + states.put(ctxId, restoredCtx); + log.info("Restored state for CalculatedField [{}]", cf.getId()); + } else { + log.warn("No state found for CalculatedField [{}], entity [{}].", cf.getId(), entityId); + } } @Override protected void cleanupEntityOnPartitionRemoval(CalculatedFieldId entityId) { - // TODO: implementation for cluster mode + calculatedFields.remove(entityId); + states.keySet().removeIf(ctxId -> ctxId.cfId().equals(entityId.getId())); } @Override @@ -213,8 +268,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas case ASSET_PROFILE, DEVICE_PROFILE -> { log.info("Initializing state for all entities in profile: tenantId=[{}], profileId=[{}]", tenantId, entityId); fetchCommonArguments(calculatedFieldCtx, callback, commonArguments -> { - getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(assetId -> { - initializeStateForEntity(calculatedFieldCtx, assetId, commonArguments, callback); + getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(targetEntityId -> { + initializeStateForEntity(calculatedFieldCtx, targetEntityId, commonArguments, callback); }); }); }