From 4fed2cd416b13b7f05159ba893f47dcd788fb13d Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 9 Dec 2024 13:04:41 +0200 Subject: [PATCH] improved ability to fetch common arguments for profile cfs --- ...efaultCalculatedFieldExecutionService.java | 120 +++++++++--------- 1 file changed, 60 insertions(+), 60 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index 7ded80d013..f2629381b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -41,7 +41,6 @@ import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.CalculatedFieldId; -import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; @@ -89,7 +88,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.SCOPE; @@ -117,8 +116,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private final ConcurrentMap states = new ConcurrentHashMap<>(); private final ConcurrentMap> profileEntities = new ConcurrentHashMap<>(); - private final ConcurrentMap> tenantStorage = new ConcurrentHashMap<>(); - private final ConcurrentMap> customerStorage = new ConcurrentHashMap<>(); private static final int MAX_LAST_RECORDS_VALUE = 1024; @@ -196,7 +193,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } case ASSET_PROFILE, DEVICE_PROFILE -> { log.info("Initializing state for all entities in profile: tenantId=[{}], profileId=[{}]", tenantId, entityId); - getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(assetId -> initializeStateForEntity(calculatedFieldCtx, assetId, callback)); + fetchCommonArguments(calculatedFieldCtx, callback, commonArguments -> { + getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(assetId -> { + initializeStateForEntity(calculatedFieldCtx, assetId, commonArguments, callback); + }); + }); } default -> throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields."); @@ -221,12 +222,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas CalculatedField calculatedField = getOrFetchFromDb(tenantId, calculatedFieldId); CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> new CalculatedFieldCtx(calculatedField, tbelInvokeService)); Map argumentValues = updatedTelemetry.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> { - if (EntityType.TENANT.equals(entityId.getEntityType()) || EntityType.CUSTOMER.equals(entityId.getEntityType())) { - updateStorage(tenantId, entityId, Optional.of(entry.getValue())); - } - return ArgumentEntry.createSingleValueArgument(entry.getValue()); - })); + .collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue()))); EntityId cfEntityId = calculatedField.getEntityId(); switch (cfEntityId.getEntityType()) { @@ -371,33 +367,71 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas .forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback)); } - private void initializeStateForEntity(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, TbCallback callback) { - Map arguments = calculatedFieldCtx.getArguments(); + private void fetchCommonArguments(CalculatedFieldCtx calculatedFieldCtx, TbCallback callback, Consumer> onComplete) { Map argumentValues = new HashMap<>(); - AtomicInteger remaining = new AtomicInteger(arguments.size()); - arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(calculatedFieldCtx, entityId, argument), new FutureCallback<>() { + List> futures = new ArrayList<>(); + + calculatedFieldCtx.getArguments().forEach((key, argument) -> { + if (!EntityType.DEVICE_PROFILE.equals(argument.getEntityId().getEntityType()) && + !EntityType.ASSET_PROFILE.equals(argument.getEntityId().getEntityType())) { + futures.add(Futures.transform(fetchKvEntry(calculatedFieldCtx.getTenantId(), argument.getEntityId(), argument), + result -> { + argumentValues.put(key, result); + return result; + }, calculatedFieldCallbackExecutor)); + } + }); + + Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { @Override - public void onSuccess(ArgumentEntry result) { - argumentValues.put(key, result); - if (remaining.decrementAndGet() == 0) { - updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues); - } + public void onSuccess(List results) { + onComplete.accept(argumentValues); } @Override public void onFailure(Throwable t) { - log.warn("Failed to initialize state for entity: [{}]", entityId, t); + log.error("Failed to fetch common arguments", t); callback.onFailure(t); } - }, calculatedFieldCallbackExecutor)); + }, calculatedFieldCallbackExecutor); + } + + private void initializeStateForEntity(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, TbCallback callback) { + initializeStateForEntity(calculatedFieldCtx, entityId, new HashMap<>(), callback); + } + + private void initializeStateForEntity(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map commonArguments, TbCallback callback) { + Map argumentValues = new HashMap<>(commonArguments); + List> futures = new ArrayList<>(); + + calculatedFieldCtx.getArguments().forEach((key, argument) -> { + if (!commonArguments.containsKey(key)) { + futures.add(Futures.transform(fetchArgumentValue(calculatedFieldCtx, entityId, argument), + result -> { + argumentValues.put(key, result); + return result; + }, calculatedFieldCallbackExecutor)); + } + }); + + Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { + @Override + public void onSuccess(List results) { + updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues); + callback.onSuccess(); + } + + @Override + public void onFailure(Throwable t) { + log.error("Failed to initialize state for entity: [{}]", entityId, t); + callback.onFailure(t); + } + }, calculatedFieldCallbackExecutor); } private ListenableFuture fetchArgumentValue(CalculatedFieldCtx calculatedFieldCtx, EntityId targetEntityId, Argument argument) { TenantId tenantId = calculatedFieldCtx.getTenantId(); EntityId argumentEntityId = argument.getEntityId(); - if (EntityType.TENANT.equals(argumentEntityId.getEntityType()) || EntityType.CUSTOMER.equals(argumentEntityId.getEntityType())) { - return fetchFromStorage(tenantId, argumentEntityId, argument); - } EntityId entityId = EntityType.DEVICE_PROFILE.equals(argumentEntityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(argumentEntityId.getEntityType()) ? targetEntityId : argumentEntityId; @@ -407,21 +441,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return fetchKvEntry(tenantId, entityId, argument); } - private ListenableFuture fetchFromStorage(TenantId tenantId, EntityId entityId, Argument argument) { - List kvEntries; - if (EntityType.TENANT.equals(entityId.getEntityType())) { - kvEntries = tenantStorage.computeIfAbsent(tenantId, id -> new ArrayList<>()); - } else { - kvEntries = customerStorage.computeIfAbsent((CustomerId) entityId, id -> new ArrayList<>()); - } - for (KvEntry kvEntry : kvEntries) { - if (kvEntry.getKey().equals(argument.getKey())) { - return Futures.immediateFuture(ArgumentEntry.createSingleValueArgument(kvEntry)); - } - } - return fetchKvEntry(tenantId, entityId, argument); - } - private ListenableFuture fetchLastRecords(TenantId tenantId, EntityId entityId, Argument argument) { long currentTime = System.currentTimeMillis(); long timeWindow = argument.getTimeWindow() == 0 ? System.currentTimeMillis() : argument.getTimeWindow(); @@ -450,26 +469,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas calculatedFieldExecutor); default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'."); }; - return Futures.transform(kvEntryFuture, kvEntry -> { - if (EntityType.TENANT.equals(entityId.getEntityType()) || EntityType.CUSTOMER.equals(entityId.getEntityType())) { - updateStorage(tenantId, entityId, kvEntry); - } - return ArgumentEntry.createSingleValueArgument(kvEntry.orElse(null)); - }, calculatedFieldExecutor); - } - - private void updateStorage(TenantId tenantId, EntityId entityId, Optional kvEntry) { - kvEntry.ifPresent(entry -> { - List kvEntries = switch (entityId.getEntityType()) { - case TENANT -> tenantStorage.computeIfAbsent(tenantId, id -> new ArrayList<>()); - case CUSTOMER -> customerStorage.computeIfAbsent((CustomerId) entityId, id -> new ArrayList<>()); - default -> null; - }; - if (kvEntries != null) { - kvEntries.removeIf(existingEntry -> existingEntry.getKey().equals(entry.getKey())); - kvEntries.add(entry); - } - }); + return Futures.transform(kvEntryFuture, kvEntry -> ArgumentEntry.createSingleValueArgument(kvEntry.orElse(null)), calculatedFieldExecutor); } private KvEntry createDefaultKvEntry(Argument argument) {