refactored defaultTbCalculatedFieldService methods

This commit is contained in:
IrynaMatveieva 2024-11-12 17:21:03 +02:00
parent b6b7af8003
commit 23009c9aaf
4 changed files with 90 additions and 110 deletions

View File

@ -89,18 +89,25 @@ public abstract class AbstractTbEntityService {
@Lazy @Lazy
private EntitiesVersionControlService vcService; private EntitiesVersionControlService vcService;
@Autowired @Autowired
@Lazy
protected AccessControlService accessControlService; protected AccessControlService accessControlService;
@Autowired @Autowired
@Lazy
protected TenantService tenantService; protected TenantService tenantService;
@Autowired @Autowired
@Lazy
protected AssetService assetService; protected AssetService assetService;
@Autowired @Autowired
@Lazy
protected DeviceService deviceService; protected DeviceService deviceService;
@Autowired @Autowired
@Lazy
protected AssetProfileService assetProfileService; protected AssetProfileService assetProfileService;
@Autowired @Autowired
@Lazy
protected DeviceProfileService deviceProfileService; protected DeviceProfileService deviceProfileService;
@Autowired @Autowired
@Lazy
protected EntityService entityService; protected EntityService entityService;
protected boolean isTestProfile() { protected boolean isTestProfile() {

View File

@ -15,17 +15,20 @@
*/ */
package org.thingsboard.server.service.entitiy.cf; package org.thingsboard.server.service.entitiy.cf;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
@Data @Data
@Builder
public class CalculatedFieldCtx { public class CalculatedFieldCtx {
private final CalculatedFieldId calculatedFieldId; private CalculatedFieldId calculatedFieldId;
private final EntityId entityId; private EntityId entityId;
private final CalculatedFieldState state; private CalculatedFieldState state;
public CalculatedFieldCtx(CalculatedFieldId calculatedFieldId, EntityId entityId, CalculatedFieldState state) {
this.calculatedFieldId = calculatedFieldId;
this.entityId = entityId;
this.state = state;
}
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.service.entitiy.cf; package org.thingsboard.server.service.entitiy.cf;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
@ -38,7 +39,6 @@ import org.thingsboard.server.common.data.cf.BaseCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration;
import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.AssetProfileId;
@ -48,8 +48,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.HasId; import org.thingsboard.server.common.data.id.HasId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.attributes.AttributesService;
@ -62,6 +61,7 @@ import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.permission.Operation; import org.thingsboard.server.service.security.permission.Operation;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -71,7 +71,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.service.Validator.validateEntityId; import static org.thingsboard.server.dao.service.Validator.validateEntityId;
@ -128,98 +127,6 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
} }
} }
private ListenableFuture<List<AttributeKvEntry>> fetchAttributesForEntity(TenantId tenantId, EntityId entityId, List<String> keys) {
return attributesService.find(tenantId, entityId, AttributeScope.SERVER_SCOPE, keys);
}
private ListenableFuture<List<TsKvEntry>> fetchTimeSeries(TenantId tenantId, EntityId entityId, List<String> keys) {
return timeseriesService.findLatest(tenantId, entityId, keys);
}
private ListenableFuture<Void> initializeStateFromFutures(TenantId tenantId, EntityId entityId, CalculatedField calculatedField, List<String> attributeKeys, List<String> timeSeriesKeys) {
ListenableFuture<List<AttributeKvEntry>> attributesFuture = fetchAttributesForEntity(tenantId, entityId, attributeKeys);
ListenableFuture<List<TsKvEntry>> timeSeriesFuture = fetchTimeSeries(tenantId, entityId, timeSeriesKeys);
ListenableFuture<List<Object>> combinedFuture = Futures.allAsList(attributesFuture, timeSeriesFuture);
return Futures.transform(combinedFuture, results -> {
List<AttributeKvEntry> attributes = (List<AttributeKvEntry>) results.get(0);
List<TsKvEntry> timeSeries = (List<TsKvEntry>) results.get(1);
initializeState(calculatedField, attributes, timeSeries);
return null;
}, MoreExecutors.directExecutor());
}
private void initializeState(CalculatedField calculatedField, List<AttributeKvEntry> attributes, List<TsKvEntry> timeSeries) {
CalculatedFieldCtx calculatedFieldCtx = states.computeIfAbsent(calculatedField.getId(),
ctx -> new CalculatedFieldCtx(calculatedField.getId(), calculatedField.getEntityId(), null));
CalculatedFieldState state = calculatedFieldCtx.getState();
if (state != null) {
String calculation = performCalculation(state.getArguments());
Map<String, String> updatedArguments = state.getArguments();
state = CalculatedFieldState.builder()
.arguments(updatedArguments)
.result(calculation)
.build();
} else {
// initial calculation
Map<String, BaseCalculatedFieldConfiguration.Argument> arguments = calculatedField.getConfiguration().getArguments();
Map<String, String> argumentValues = arguments.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> resolveArgumentValue(entry.getKey(), entry.getValue(), attributes, timeSeries)
));
String calculation = performCalculation(argumentValues);
state = CalculatedFieldState.builder()
.arguments(argumentValues)
.result(calculation)
.build();
}
calculatedFieldCtx = new CalculatedFieldCtx(calculatedField.getId(), calculatedField.getEntityId(), state);
states.put(calculatedField.getId(), calculatedFieldCtx);
}
private String resolveArgumentValue(String key, BaseCalculatedFieldConfiguration.Argument argument,
List<AttributeKvEntry> attributes, List<TsKvEntry> timeSeries) {
String type = argument.getType();
String value = null;
if ("ATTRIBUTES".equals(type)) {
value = attributes.stream()
.filter(attribute -> attribute.getKey().equals(key))
.map(AttributeKvEntry::getValueAsString)
.findFirst()
.orElse(null);
} else if ("TIME_SERIES".equals(type)) {
value = timeSeries.stream()
.filter(tsKvEntry -> tsKvEntry.getKey().equals(key))
.map(TsKvEntry::getValueAsString)
.findFirst()
.orElse(null);
}
return value != null ? value : argument.getDefaultValue();
}
private <T extends EntityId> void initializeForProfile(TenantId tenantId, List<CalculatedFieldLink> links, CalculatedField cf, Iterable<T> profileIds) {
for (T profileId : profileIds) {
for (CalculatedFieldLink link : links) {
CalculatedFieldLinkConfiguration configuration = link.getConfiguration();
initializeStateFromFutures(tenantId, profileId, cf, configuration.getAttributes(), configuration.getTimeSeries());
}
}
}
@Override @Override
public void onCalculatedFieldAdded(TransportProtos.CalculatedFieldAddMsgProto proto, TbCallback callback) { public void onCalculatedFieldAdded(TransportProtos.CalculatedFieldAddMsgProto proto, TbCallback callback) {
try { try {
@ -232,21 +139,16 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
calculatedFields.put(calculatedFieldId, cf); calculatedFields.put(calculatedFieldId, cf);
calculatedFieldLinks.put(calculatedFieldId, links); calculatedFieldLinks.put(calculatedFieldId, links);
switch (entityId.getEntityType()) { switch (entityId.getEntityType()) {
case ASSET, DEVICE -> { case ASSET, DEVICE -> initializeStateForEntity(tenantId, cf, callback);
for (CalculatedFieldLink link : links) {
CalculatedFieldLinkConfiguration configuration = link.getConfiguration();
initializeStateFromFutures(tenantId, link.getEntityId(), cf, configuration.getAttributes(), configuration.getTimeSeries());
}
}
case ASSET_PROFILE -> { case ASSET_PROFILE -> {
PageDataIterable<AssetId> assetIds = new PageDataIterable<>(pageLink -> PageDataIterable<AssetId> assetIds = new PageDataIterable<>(pageLink ->
assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) entityId, pageLink), initFetchPackSize); assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) entityId, pageLink), initFetchPackSize);
initializeForProfile(tenantId, links, cf, assetIds); assetIds.forEach(assetId -> initializeStateForEntity(tenantId, cf, callback));
} }
case DEVICE_PROFILE -> { case DEVICE_PROFILE -> {
PageDataIterable<DeviceId> deviceIds = new PageDataIterable<>(pageLink -> PageDataIterable<DeviceId> deviceIds = new PageDataIterable<>(pageLink ->
deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityId, pageLink), initFetchPackSize); deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityId, pageLink), initFetchPackSize);
initializeForProfile(tenantId, links, cf, deviceIds); deviceIds.forEach(deviceId -> initializeStateForEntity(tenantId, cf, callback));
} }
default -> default ->
throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields."); throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields.");
@ -359,7 +261,72 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
}; };
} }
private String performCalculation(Map<String, String> argumentValues) { private void initializeStateForEntity(TenantId tenantId, CalculatedField calculatedField, TbCallback callback) {
Map<String, BaseCalculatedFieldConfiguration.Argument> arguments = calculatedField.getConfiguration().getArguments();
Map<String, String> argumentValues = new HashMap<>();
arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(tenantId, argument), new FutureCallback<>() {
@Override
public void onSuccess(Optional<? extends KvEntry> result) {
String value = result.map(KvEntry::getValueAsString).orElse(argument.getDefaultValue());
argumentValues.put(key, value);
}
@Override
public void onFailure(Throwable t) {
log.warn("Failed to fetch data for type: {}", argument.getType(), t);
callback.onFailure(t);
}
}, calculatedFieldCallbackExecutor));
updateOrInitializeState(calculatedField, argumentValues);
}
private ListenableFuture<Optional<? extends KvEntry>> fetchArgumentValue(TenantId tenantId, BaseCalculatedFieldConfiguration.Argument argument) {
return switch (argument.getType()) {
case "ATTRIBUTES" -> Futures.transform(
attributesService.find(tenantId, argument.getEntityId(), AttributeScope.SERVER_SCOPE, argument.getKey()),
result -> result.map(entry -> (KvEntry) entry),
MoreExecutors.directExecutor());
case "TIME_SERIES" -> Futures.transform(
timeseriesService.findLatest(tenantId, argument.getEntityId(), argument.getKey()),
result -> result.map(entry -> (KvEntry) entry),
MoreExecutors.directExecutor());
default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'.");
};
}
private void updateOrInitializeState(CalculatedField calculatedField, Map<String, String> argumentValues) {
CalculatedFieldCtx calculatedFieldCtx = states.computeIfAbsent(calculatedField.getId(),
ctx -> new CalculatedFieldCtx(calculatedField.getId(), calculatedField.getEntityId(), null));
CalculatedFieldState state = calculatedFieldCtx.getState();
if (state != null) {
// calculation based on the previous data
String calculation = performCalculation(state.getArguments(), calculatedField.getConfiguration());
Map<String, String> updatedArguments = new HashMap<>(state.getArguments());
state = CalculatedFieldState.builder()
.arguments(updatedArguments)
.result(calculation)
.build();
} else {
// initial calculation
String calculation = performCalculation(argumentValues, calculatedField.getConfiguration());
state = CalculatedFieldState.builder()
.arguments(argumentValues)
.result(calculation)
.build();
}
calculatedFieldCtx.setState(state);
states.put(calculatedField.getId(), calculatedFieldCtx);
}
private String performCalculation(Map<String, String> argumentValues, CalculatedFieldConfiguration calculatedFieldConfiguration) {
BaseCalculatedFieldConfiguration.Output output = calculatedFieldConfiguration.getOutput();
return "calculation"; return "calculation";
} }

View File

@ -41,6 +41,9 @@ public interface CalculatedFieldConfiguration {
Map<String, BaseCalculatedFieldConfiguration.Argument> getArguments(); Map<String, BaseCalculatedFieldConfiguration.Argument> getArguments();
@JsonIgnore
BaseCalculatedFieldConfiguration.Output getOutput();
@JsonIgnore @JsonIgnore
List<EntityId> getReferencedEntities(); List<EntityId> getReferencedEntities();