improved ability to fetch common arguments for profile cfs

This commit is contained in:
IrynaMatveieva 2024-12-09 13:04:41 +02:00
parent 3038510150
commit 4fed2cd416

View File

@ -41,7 +41,6 @@ import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.id.AssetProfileId;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
@ -89,7 +88,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
@ -117,8 +116,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
private final ConcurrentMap<CalculatedFieldEntityCtxId, CalculatedFieldEntityCtx> states = new ConcurrentHashMap<>();
private final ConcurrentMap<EntityId, Set<EntityId>> profileEntities = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, List<KvEntry>> tenantStorage = new ConcurrentHashMap<>();
private final ConcurrentMap<CustomerId, List<KvEntry>> customerStorage = new ConcurrentHashMap<>();
private static final int MAX_LAST_RECORDS_VALUE = 1024;
@ -196,7 +193,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
case ASSET_PROFILE, DEVICE_PROFILE -> {
log.info("Initializing state for all entities in profile: tenantId=[{}], profileId=[{}]", tenantId, entityId);
getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(assetId -> initializeStateForEntity(calculatedFieldCtx, assetId, callback));
fetchCommonArguments(calculatedFieldCtx, callback, commonArguments -> {
getOrFetchFromDBProfileEntities(tenantId, entityId).forEach(assetId -> {
initializeStateForEntity(calculatedFieldCtx, assetId, commonArguments, callback);
});
});
}
default ->
throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields.");
@ -221,12 +222,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
CalculatedField calculatedField = getOrFetchFromDb(tenantId, calculatedFieldId);
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> new CalculatedFieldCtx(calculatedField, tbelInvokeService));
Map<String, ArgumentEntry> argumentValues = updatedTelemetry.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> {
if (EntityType.TENANT.equals(entityId.getEntityType()) || EntityType.CUSTOMER.equals(entityId.getEntityType())) {
updateStorage(tenantId, entityId, Optional.of(entry.getValue()));
}
return ArgumentEntry.createSingleValueArgument(entry.getValue());
}));
.collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue())));
EntityId cfEntityId = calculatedField.getEntityId();
switch (cfEntityId.getEntityType()) {
@ -371,33 +367,71 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
.forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback));
}
private void initializeStateForEntity(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, TbCallback callback) {
Map<String, Argument> arguments = calculatedFieldCtx.getArguments();
private void fetchCommonArguments(CalculatedFieldCtx calculatedFieldCtx, TbCallback callback, Consumer<Map<String, ArgumentEntry>> onComplete) {
Map<String, ArgumentEntry> argumentValues = new HashMap<>();
AtomicInteger remaining = new AtomicInteger(arguments.size());
arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(calculatedFieldCtx, entityId, argument), new FutureCallback<>() {
List<ListenableFuture<ArgumentEntry>> futures = new ArrayList<>();
calculatedFieldCtx.getArguments().forEach((key, argument) -> {
if (!EntityType.DEVICE_PROFILE.equals(argument.getEntityId().getEntityType()) &&
!EntityType.ASSET_PROFILE.equals(argument.getEntityId().getEntityType())) {
futures.add(Futures.transform(fetchKvEntry(calculatedFieldCtx.getTenantId(), argument.getEntityId(), argument),
result -> {
argumentValues.put(key, result);
return result;
}, calculatedFieldCallbackExecutor));
}
});
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
@Override
public void onSuccess(ArgumentEntry result) {
argumentValues.put(key, result);
if (remaining.decrementAndGet() == 0) {
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues);
}
public void onSuccess(List<ArgumentEntry> results) {
onComplete.accept(argumentValues);
}
@Override
public void onFailure(Throwable t) {
log.warn("Failed to initialize state for entity: [{}]", entityId, t);
log.error("Failed to fetch common arguments", t);
callback.onFailure(t);
}
}, calculatedFieldCallbackExecutor));
}, calculatedFieldCallbackExecutor);
}
private void initializeStateForEntity(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, TbCallback callback) {
initializeStateForEntity(calculatedFieldCtx, entityId, new HashMap<>(), callback);
}
private void initializeStateForEntity(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map<String, ArgumentEntry> commonArguments, TbCallback callback) {
Map<String, ArgumentEntry> argumentValues = new HashMap<>(commonArguments);
List<ListenableFuture<ArgumentEntry>> futures = new ArrayList<>();
calculatedFieldCtx.getArguments().forEach((key, argument) -> {
if (!commonArguments.containsKey(key)) {
futures.add(Futures.transform(fetchArgumentValue(calculatedFieldCtx, entityId, argument),
result -> {
argumentValues.put(key, result);
return result;
}, calculatedFieldCallbackExecutor));
}
});
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
@Override
public void onSuccess(List<ArgumentEntry> results) {
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues);
callback.onSuccess();
}
@Override
public void onFailure(Throwable t) {
log.error("Failed to initialize state for entity: [{}]", entityId, t);
callback.onFailure(t);
}
}, calculatedFieldCallbackExecutor);
}
private ListenableFuture<ArgumentEntry> fetchArgumentValue(CalculatedFieldCtx calculatedFieldCtx, EntityId targetEntityId, Argument argument) {
TenantId tenantId = calculatedFieldCtx.getTenantId();
EntityId argumentEntityId = argument.getEntityId();
if (EntityType.TENANT.equals(argumentEntityId.getEntityType()) || EntityType.CUSTOMER.equals(argumentEntityId.getEntityType())) {
return fetchFromStorage(tenantId, argumentEntityId, argument);
}
EntityId entityId = EntityType.DEVICE_PROFILE.equals(argumentEntityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(argumentEntityId.getEntityType())
? targetEntityId
: argumentEntityId;
@ -407,21 +441,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
return fetchKvEntry(tenantId, entityId, argument);
}
private ListenableFuture<ArgumentEntry> fetchFromStorage(TenantId tenantId, EntityId entityId, Argument argument) {
List<KvEntry> kvEntries;
if (EntityType.TENANT.equals(entityId.getEntityType())) {
kvEntries = tenantStorage.computeIfAbsent(tenantId, id -> new ArrayList<>());
} else {
kvEntries = customerStorage.computeIfAbsent((CustomerId) entityId, id -> new ArrayList<>());
}
for (KvEntry kvEntry : kvEntries) {
if (kvEntry.getKey().equals(argument.getKey())) {
return Futures.immediateFuture(ArgumentEntry.createSingleValueArgument(kvEntry));
}
}
return fetchKvEntry(tenantId, entityId, argument);
}
private ListenableFuture<ArgumentEntry> fetchLastRecords(TenantId tenantId, EntityId entityId, Argument argument) {
long currentTime = System.currentTimeMillis();
long timeWindow = argument.getTimeWindow() == 0 ? System.currentTimeMillis() : argument.getTimeWindow();
@ -450,26 +469,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
calculatedFieldExecutor);
default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'.");
};
return Futures.transform(kvEntryFuture, kvEntry -> {
if (EntityType.TENANT.equals(entityId.getEntityType()) || EntityType.CUSTOMER.equals(entityId.getEntityType())) {
updateStorage(tenantId, entityId, kvEntry);
}
return ArgumentEntry.createSingleValueArgument(kvEntry.orElse(null));
}, calculatedFieldExecutor);
}
private void updateStorage(TenantId tenantId, EntityId entityId, Optional<? extends KvEntry> kvEntry) {
kvEntry.ifPresent(entry -> {
List<KvEntry> kvEntries = switch (entityId.getEntityType()) {
case TENANT -> tenantStorage.computeIfAbsent(tenantId, id -> new ArrayList<>());
case CUSTOMER -> customerStorage.computeIfAbsent((CustomerId) entityId, id -> new ArrayList<>());
default -> null;
};
if (kvEntries != null) {
kvEntries.removeIf(existingEntry -> existingEntry.getKey().equals(entry.getKey()));
kvEntries.add(entry);
}
});
return Futures.transform(kvEntryFuture, kvEntry -> ArgumentEntry.createSingleValueArgument(kvEntry.orElse(null)), calculatedFieldExecutor);
}
private KvEntry createDefaultKvEntry(Argument argument) {