From 2a41c8b45123e43fc34def1a954e098298c7339e Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 20 Dec 2024 16:16:58 +0200 Subject: [PATCH] implemented logic to fetch telemetry if states were not fetched --- ...efaultCalculatedFieldExecutionService.java | 177 ++++++++++-------- .../ctx/state/SingleValueArgumentEntry.java | 10 + .../cf/ctx/state/TsRollingArgumentEntry.java | 13 +- 3 files changed, 121 insertions(+), 79 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index f2a09d0faf..1128a75008 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -81,12 +81,13 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry; import org.thingsboard.server.service.partition.AbstractPartitionBasedService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -97,6 +98,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -200,11 +202,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas var future = calculatedFieldExecutor.submit(() -> { try { for (CalculatedField cf : partition) { - if (EntityType.ASSET_PROFILE.equals(cf.getEntityId().getEntityType()) || EntityType.DEVICE_PROFILE.equals(cf.getEntityId().getEntityType())) { - getOrFetchFromDBProfileEntities(cf.getTenantId(), cf.getEntityId()) + EntityId cfEntityId = cf.getEntityId(); + if (isProfileEntity(cfEntityId)) { + getOrFetchFromDBProfileEntities(cf.getTenantId(), cfEntityId) .forEach(entityId -> restoreState(cf, entityId)); } else { - restoreState(cf, cf.getEntityId()); + restoreState(cf, cfEntityId); } } } catch (Throwable t) { @@ -272,9 +275,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } case ASSET_PROFILE, DEVICE_PROFILE -> { log.info("Initializing state for all entities in profile: tenantId=[{}], profileId=[{}]", tenantId, entityId); - fetchCommonArguments(calculatedFieldCtx, callback, commonArguments -> { + Map commonArguments = calculatedFieldCtx.getArguments().entrySet().stream() + .filter(entry -> !isProfileEntity(entry.getValue().getEntityId())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + fetchArguments(tenantId, entityId, commonArguments, commonArgs -> { getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(targetEntityId -> { - initializeStateForEntity(calculatedFieldCtx, targetEntityId, commonArguments, callback); + initializeStateForEntity(calculatedFieldCtx, targetEntityId, commonArgs, callback); }); }); } @@ -473,7 +479,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas if (calculatedFieldState != null) { msgBuilder.setState(JacksonUtil.writeValueAsString(calculatedFieldState)); } - clusterService.pushMsgToCore(tenantId, entityId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msgBuilder).build(), null); + clusterService.pushMsgToCore(tenantId, calculatedFieldId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msgBuilder).build(), null); } private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) { @@ -556,35 +562,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas .forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback)); } - private void fetchCommonArguments(CalculatedFieldCtx calculatedFieldCtx, TbCallback callback, Consumer> onComplete) { - Map argumentValues = new HashMap<>(); - List> futures = new ArrayList<>(); - - calculatedFieldCtx.getArguments().forEach((key, argument) -> { - if (!EntityType.DEVICE_PROFILE.equals(argument.getEntityId().getEntityType()) && - !EntityType.ASSET_PROFILE.equals(argument.getEntityId().getEntityType())) { - futures.add(Futures.transform(fetchKvEntry(calculatedFieldCtx.getTenantId(), argument.getEntityId(), argument), - result -> { - argumentValues.put(key, result); - return result; - }, calculatedFieldCallbackExecutor)); - } - }); - - Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { - @Override - public void onSuccess(List results) { - onComplete.accept(argumentValues); - } - - @Override - public void onFailure(Throwable t) { - log.error("Failed to fetch common arguments", t); - callback.onFailure(t); - } - }, calculatedFieldCallbackExecutor); - } - private void initializeStateForEntity(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, TbCallback callback) { initializeStateForEntity(calculatedFieldCtx, entityId, new HashMap<>(), callback); } @@ -595,7 +572,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas calculatedFieldCtx.getArguments().forEach((key, argument) -> { if (!commonArguments.containsKey(key)) { - futures.add(Futures.transform(fetchArgumentValue(calculatedFieldCtx, entityId, argument), + futures.add(Futures.transform(fetchArgumentValue(calculatedFieldCtx.getTenantId(), entityId, argument), result -> { argumentValues.put(key, result); return result; @@ -618,10 +595,25 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }, calculatedFieldCallbackExecutor); } - private ListenableFuture fetchArgumentValue(CalculatedFieldCtx calculatedFieldCtx, EntityId targetEntityId, Argument argument) { - TenantId tenantId = calculatedFieldCtx.getTenantId(); + private ListenableFuture fetchArguments(TenantId tenantId, EntityId entityId, Map necessaryArguments, Consumer> onComplete) { + Map argumentValues = new HashMap<>(); + List> futures = new ArrayList<>(); + necessaryArguments.forEach((key, argument) -> { + futures.add(Futures.transform(fetchArgumentValue(tenantId, entityId, argument), + result -> { + argumentValues.put(key, result); + return result; + }, calculatedFieldCallbackExecutor)); + }); + return Futures.transform(Futures.allAsList(futures), results -> { + onComplete.accept(argumentValues); + return null; + }, calculatedFieldCallbackExecutor); + } + + private ListenableFuture fetchArgumentValue(TenantId tenantId, EntityId targetEntityId, Argument argument) { EntityId argumentEntityId = argument.getEntityId(); - EntityId entityId = EntityType.DEVICE_PROFILE.equals(argumentEntityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(argumentEntityId.getEntityType()) + EntityId entityId = isProfileEntity(argumentEntityId) ? targetEntityId : argumentEntityId; return fetchKvEntry(tenantId, entityId, argument); @@ -654,11 +646,17 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, currentTime, 0, limit, Aggregation.NONE); ListenableFuture> tsRollingFuture = timeseriesService.findAll(tenantId, entityId, List.of(query)); - return Futures.transform(tsRollingFuture, tsRolling -> tsRolling == null ? ArgumentEntry.createTsRollingArgument(Collections.emptyList()) : ArgumentEntry.createTsRollingArgument(tsRolling), calculatedFieldCallbackExecutor); + return Futures.transform(tsRollingFuture, tsRolling -> tsRolling == null ? TsRollingArgumentEntry.EMPTY : ArgumentEntry.createTsRollingArgument(tsRolling), calculatedFieldCallbackExecutor); } private ListenableFuture transformSingleValueArgument(ListenableFuture> kvEntryFuture) { - return Futures.transform(kvEntryFuture, kvEntry -> ArgumentEntry.createSingleValueArgument(kvEntry.orElse(null)), calculatedFieldCallbackExecutor); + return Futures.transform(kvEntryFuture, kvEntry -> { + if (kvEntry.isPresent() && kvEntry.get().getValue() != null) { + return ArgumentEntry.createSingleValueArgument(kvEntry.get()); + } else { + return SingleValueArgumentEntry.EMPTY; + } + }, calculatedFieldCallbackExecutor); } private KvEntry createDefaultKvEntry(Argument argument) { @@ -676,48 +674,67 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map argumentValues) { CalculatedFieldId cfId = calculatedFieldCtx.getCfId(); CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); - CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, this::fetchCalculatedFieldEntityState); + CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, ctxId -> fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType())); + + Predicate> allArgsPresent = (args) -> + args.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) && + !args.containsValue(SingleValueArgumentEntry.EMPTY) && !args.containsValue(TsRollingArgumentEntry.EMPTY); + + Consumer performUpdateState = (state) -> { + if (state.updateState(argumentValues)) { + calculatedFieldEntityCtx.setState(state); + TenantId tenantId = calculatedFieldCtx.getTenantId(); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId); + if (tpi.isMyPartition()) { + states.put(entityCtxId, calculatedFieldEntityCtx); + rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); + } else { + sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, state); + } + + if (allArgsPresent.test(state.getArguments())) { + performCalculation(calculatedFieldCtx, state, entityId); + } + } + }; CalculatedFieldState state = calculatedFieldEntityCtx.getState(); + boolean allKeysPresent = argumentValues.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()); + if (!allKeysPresent) { - if (state == null) { - state = createStateByType(calculatedFieldCtx.getCfType()); - } - if (state.updateState(argumentValues)) { - calculatedFieldEntityCtx.setState(state); - TenantId tenantId = calculatedFieldCtx.getTenantId(); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldCtx.getCfId()); - if (tpi.isMyPartition()) { - states.put(entityCtxId, calculatedFieldEntityCtx); - rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); - } else { - sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, state); - } - - boolean allArgsPresent = calculatedFieldCtx.getArguments().keySet().containsAll(state.getArguments().keySet()); - if (allArgsPresent) { - ListenableFuture resultFuture = state.performCalculation(calculatedFieldCtx); - Futures.addCallback(resultFuture, new FutureCallback<>() { - @Override - public void onSuccess(CalculatedFieldResult result) { - if (result != null) { - pushMsgToRuleEngine(tenantId, entityId, result); - } - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Failed to perform calculation. entityId: [{}]", calculatedFieldCtx.getCfId(), entityId, t); - } - }, MoreExecutors.directExecutor()); - } + Map missingArguments = calculatedFieldCtx.getArguments().entrySet().stream() + .filter(entry -> !argumentValues.containsKey(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentValues::putAll) + .addListener(() -> performUpdateState.accept(state), + calculatedFieldCallbackExecutor); + return; } + performUpdateState.accept(state); } - private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState(CalculatedFieldEntityCtxId entityCtxId) { + private void performCalculation(CalculatedFieldCtx calculatedFieldCtx, CalculatedFieldState state, EntityId entityId) { + ListenableFuture resultFuture = state.performCalculation(calculatedFieldCtx); + Futures.addCallback(resultFuture, new FutureCallback<>() { + @Override + public void onSuccess(CalculatedFieldResult result) { + if (result != null) { + pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), entityId, result); + } + } + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] Failed to perform calculation. entityId: [{}]", calculatedFieldCtx.getCfId(), entityId, t); + } + }, MoreExecutors.directExecutor()); + } + + private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState(CalculatedFieldEntityCtxId entityCtxId, CalculatedFieldType cfType) { String stateStr = rocksDBService.get(JacksonUtil.writeValueAsString(entityCtxId)); if (stateStr == null) { - return new CalculatedFieldEntityCtx(entityCtxId, null); + return new CalculatedFieldEntityCtx(entityCtxId, createStateByType(cfType)); } return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class); } @@ -725,8 +742,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private void pushMsgToRuleEngine(TenantId tenantId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult) { try { String type = calculatedFieldResult.getType(); - TbMsgType msgType = "ATTRIBUTE".equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; - TbMsgMetaData md = "ATTRIBUTE".equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; + TbMsgType msgType = "ATTRIBUTES".equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; + TbMsgMetaData md = "ATTRIBUTES".equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; ObjectNode payload = createJsonPayload(calculatedFieldResult); TbMsg msg = TbMsg.newMsg(msgType, originatorId, md, JacksonUtil.writeValueAsString(payload)); clusterService.pushMsgToRuleEngine(tenantId, originatorId, msg, null); @@ -749,4 +766,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }; } + private boolean isProfileEntity(EntityId entityId) { + return EntityType.DEVICE_PROFILE.equals(entityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(entityId.getEntityType()); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java index 81a57580db..3f4fd5bdce 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java @@ -27,6 +27,8 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; @AllArgsConstructor public class SingleValueArgumentEntry implements ArgumentEntry { + public static final ArgumentEntry EMPTY = new SingleValueArgumentEntry(0); + private long ts; private Object value; @@ -39,6 +41,14 @@ public class SingleValueArgumentEntry implements ArgumentEntry { this.value = entry.getValue(); } + /** + * Internal constructor to create immutable SingleValueArgumentEntry.EMPTY + * */ + private SingleValueArgumentEntry(int ignored) { + this.ts = System.currentTimeMillis(); + this.value = null; + } + @Override public ArgumentType getType() { return ArgumentType.SINGLE_VALUE; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java index 49aae15ac1..4ffa391550 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java @@ -29,6 +29,8 @@ import java.util.TreeMap; @Slf4j public class TsRollingArgumentEntry implements ArgumentEntry { + public static final ArgumentEntry EMPTY = new TsRollingArgumentEntry(0); + private static final int MAX_ROLLING_ARGUMENT_ENTRY_SIZE = 1000; private TreeMap tsRecords = new TreeMap<>(); @@ -37,6 +39,13 @@ public class TsRollingArgumentEntry implements ArgumentEntry { addAllTsRecords(tsRecords); } + /** + * Internal constructor to create immutable TsRollingArgumentEntry.EMPTY + */ + private TsRollingArgumentEntry(int ignored) { + this.tsRecords = new TreeMap<>(); + } + @Override public ArgumentType getType() { return ArgumentType.TS_ROLLING; @@ -50,7 +59,9 @@ public class TsRollingArgumentEntry implements ArgumentEntry { @Override public boolean hasUpdatedValue(ArgumentEntry entry) { - return !tsRecords.containsKey(((SingleValueArgumentEntry) entry).getTs()); + return entry instanceof SingleValueArgumentEntry ? + !tsRecords.containsKey(((SingleValueArgumentEntry) entry).getTs()) : + !tsRecords.keySet().containsAll(((TsRollingArgumentEntry) entry).getTsRecords().keySet()); } @Override