changed Argument structure

This commit is contained in:
IrynaMatveieva 2025-01-10 16:58:26 +02:00
parent 77e99d15df
commit 6611f017c7
18 changed files with 195 additions and 122 deletions

View File

@ -38,6 +38,8 @@ public interface CalculatedFieldCache {
CalculatedFieldCtx getCalculatedFieldCtx(CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService);
List<CalculatedFieldCtx> getCalculatedFieldCtxsByEntityId(EntityId entityId, TbelInvokeService tbelInvokeService);
Set<EntityId> getEntitiesByProfile(TenantId tenantId, EntityId entityId);
void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId);

View File

@ -61,6 +61,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
private final ConcurrentMap<CalculatedFieldId, List<CalculatedFieldLink>> calculatedFieldLinks = new ConcurrentHashMap<>();
private final ConcurrentMap<EntityId, List<CalculatedFieldLink>> entityIdCalculatedFieldLinks = new ConcurrentHashMap<>();
private final ConcurrentMap<CalculatedFieldId, CalculatedFieldCtx> calculatedFieldsCtx = new ConcurrentHashMap<>();
private final ConcurrentMap<EntityId, List<CalculatedFieldCtx>> entityIdCalculatedFieldCtxs = new ConcurrentHashMap<>();
private final ConcurrentMap<EntityId, Set<EntityId>> profileEntities = new ConcurrentHashMap<>();
@Value("${calculatedField.initFetchPackSize:50000}")
@ -126,6 +127,13 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
return ctx;
}
@Override
public List<CalculatedFieldCtx> getCalculatedFieldCtxsByEntityId(EntityId entityId, TbelInvokeService tbelInvokeService) {
return getCalculatedFieldsByEntityId(entityId).stream()
.map(cf -> getCalculatedFieldCtx(cf.getId(), tbelInvokeService))
.toList();
}
@Override
public Set<EntityId> getEntitiesByProfile(TenantId tenantId, EntityId entityProfileId) {
Set<EntityId> entities = profileEntities.get(entityProfileId);

View File

@ -39,8 +39,6 @@ import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
@ -273,7 +271,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
case ASSET_PROFILE, DEVICE_PROFILE -> {
log.info("Initializing state for all entities in profile: tenantId=[{}], profileId=[{}]", tenantId, entityId);
Map<String, Argument> commonArguments = calculatedFieldCtx.getArguments().entrySet().stream()
.filter(entry -> !isProfileEntity(entry.getValue().getEntityId()))
.filter(entry -> !isProfileEntity(entry.getValue().getRefEntityId()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
fetchArguments(tenantId, entityId, commonArguments, commonArgs -> {
calculatedFieldCache.getEntitiesByProfile(tenantId, entityId).forEach(targetEntityId -> {
@ -341,30 +339,30 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
@Override
public void onTelemetryUpdate(CalculatedFieldTelemetryUpdateRequest calculatedFieldTelemetryUpdateRequest) {
public void onTelemetryUpdate(CalculatedFieldTelemetryUpdateRequest request) {
try {
EntityId entityId = calculatedFieldTelemetryUpdateRequest.getEntityId();
EntityId entityId = request.getEntityId();
if (supportedReferencedEntities.contains(entityId.getEntityType())) {
TenantId tenantId = calculatedFieldTelemetryUpdateRequest.getTenantId();
TenantId tenantId = request.getTenantId();
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
if (tpi.isMyPartition()) {
processCalculatedFields(calculatedFieldTelemetryUpdateRequest, entityId);
processCalculatedFields(calculatedFieldTelemetryUpdateRequest, getProfileId(tenantId, entityId));
processCalculatedFields(request, entityId);
processCalculatedFields(request, getProfileId(tenantId, entityId));
Map<TopicPartitionInfo, List<CalculatedFieldEntityCtxId>> tpiStatesToUpdate = new HashMap<>();
processCalculatedFieldLinks(calculatedFieldTelemetryUpdateRequest, tpiStatesToUpdate);
processCalculatedFieldLinks(request, tpiStatesToUpdate);
if (!tpiStatesToUpdate.isEmpty()) {
tpiStatesToUpdate.forEach((topicPartitionInfo, ctxIds) -> {
TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(calculatedFieldTelemetryUpdateRequest, ctxIds);
TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(request, ctxIds);
clusterService.pushMsgToRuleEngine(topicPartitionInfo, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder()
.setCfTelemetryUpdateMsg(telemetryUpdateMsgProto).build(), null);
});
}
} else {
TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(calculatedFieldTelemetryUpdateRequest);
TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(request);
clusterService.pushMsgToRuleEngine(tpi, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder()
.setCfTelemetryUpdateMsg(telemetryUpdateMsgProto).build(), null);
}
@ -375,13 +373,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
private void processCalculatedFields(CalculatedFieldTelemetryUpdateRequest request, EntityId cfTargetEntityId) {
TenantId tenantId = request.getTenantId();
EntityId entityId = request.getEntityId();
if (cfTargetEntityId != null) {
calculatedFieldCache.getCalculatedFieldsByEntityId(cfTargetEntityId).forEach(cf -> {
CalculatedFieldLinkConfiguration linkConfiguration = cf.getConfiguration().getReferencedEntityConfig(cfTargetEntityId);
mapAndProcessUpdatedTelemetry(tenantId, entityId, cf.getId(), request, linkConfiguration);
calculatedFieldCache.getCalculatedFieldCtxsByEntityId(cfTargetEntityId, tbelInvokeService).forEach(ctx -> {
Map<String, KvEntry> updatedTelemetry = request.getMappedTelemetry(ctx);
if (!updatedTelemetry.isEmpty()) {
executeTelemetryUpdate(ctx, request.getEntityId(), request.getPreviousCalculatedFieldIds(), updatedTelemetry);
}
});
}
}
@ -393,56 +390,32 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId)
.forEach(link -> {
CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId();
EntityId targetEntityId = calculatedFieldCache.getCalculatedField(calculatedFieldId).getEntityId();
CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId, tbelInvokeService);
EntityId targetEntityId = ctx.getEntityId();
if (isProfileEntity(targetEntityId)) {
calculatedFieldCache.getEntitiesByProfile(tenantId, targetEntityId).forEach(entityByProfile -> {
processCalculatedFieldLink(request, entityByProfile, link, tpiStates);
processCalculatedFieldLink(request, entityByProfile, ctx, tpiStates);
});
} else {
processCalculatedFieldLink(request, targetEntityId, link, tpiStates);
processCalculatedFieldLink(request, targetEntityId, ctx, tpiStates);
}
});
}
private void processCalculatedFieldLink(CalculatedFieldTelemetryUpdateRequest request, EntityId targetEntity, CalculatedFieldLink link, Map<TopicPartitionInfo, List<CalculatedFieldEntityCtxId>> tpiStates) {
TenantId tenantId = request.getTenantId();
EntityId entityId = request.getEntityId();
CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId();
TopicPartitionInfo targetEntityTpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, targetEntity);
private void processCalculatedFieldLink(CalculatedFieldTelemetryUpdateRequest request, EntityId targetEntity, CalculatedFieldCtx ctx, Map<TopicPartitionInfo, List<CalculatedFieldEntityCtxId>> tpiStates) {
TopicPartitionInfo targetEntityTpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, request.getTenantId(), targetEntity);
if (targetEntityTpi.isMyPartition()) {
mapAndProcessUpdatedTelemetry(tenantId, entityId, calculatedFieldId, request, link.getConfiguration());
Map<String, KvEntry> updatedTelemetry = request.getMappedTelemetry(ctx);
if (!updatedTelemetry.isEmpty()) {
executeTelemetryUpdate(ctx, request.getEntityId(), request.getPreviousCalculatedFieldIds(), updatedTelemetry);
}
} else {
List<CalculatedFieldEntityCtxId> ctxIds = tpiStates.computeIfAbsent(targetEntityTpi, k -> new ArrayList<>());
ctxIds.add(new CalculatedFieldEntityCtxId(calculatedFieldId, targetEntity));
ctxIds.add(new CalculatedFieldEntityCtxId(ctx.getCfId(), targetEntity));
}
}
private void mapAndProcessUpdatedTelemetry(TenantId tenantId,
EntityId entityId,
CalculatedFieldId calculatedFieldId,
CalculatedFieldTelemetryUpdateRequest request,
CalculatedFieldLinkConfiguration linkConfiguration) {
Map<String, String> telemetryKeys = request.getTelemetryKeysFromLink(linkConfiguration);
Map<String, KvEntry> updatedTelemetry = mapTelemetryKeys(telemetryKeys, request.getKvEntries());
if (!updatedTelemetry.isEmpty()) {
List<CalculatedFieldId> previousCalculatedFieldIds = request.getPreviousCalculatedFieldIds();
executeTelemetryUpdate(tenantId, entityId, calculatedFieldId, previousCalculatedFieldIds, updatedTelemetry);
}
}
private Map<String, KvEntry> mapTelemetryKeys(Map<String, String> telemetryKeys, List<? extends KvEntry> kvEntries) {
return kvEntries.stream()
.filter(entry -> telemetryKeys.containsKey(entry.getKey()))
.collect(Collectors.toMap(
entry -> telemetryKeys.getOrDefault(entry.getKey(), entry.getKey()),
entry -> entry,
(v1, v2) -> v1
));
}
@Override
public void onTelemetryUpdateMsg(TransportProtos.TelemetryUpdateMsgProto proto) {
try {
@ -454,40 +427,26 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
proto.getLinksList().forEach(ctxIdProto -> {
TenantId tenantId = request.getTenantId();
EntityId entityId = request.getEntityId();
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB()));
CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId, tbelInvokeService);
CalculatedFieldLinkConfiguration linkConfiguration
= calculatedFieldCache.getCalculatedField(calculatedFieldId).getConfiguration().getReferencedEntityConfig(entityId);
mapAndProcessUpdatedTelemetry(tenantId, entityId, calculatedFieldId, request, linkConfiguration);
Map<String, KvEntry> updatedTelemetry = request.getMappedTelemetry(ctx);
if (!updatedTelemetry.isEmpty()) {
executeTelemetryUpdate(ctx, entityId, request.getPreviousCalculatedFieldIds(), updatedTelemetry);
}
});
} catch (Exception e) {
log.trace("Failed to process telemetry update msg: [{}]", proto, e);
}
}
private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List<CalculatedFieldId> previousCalculatedFieldIds, Map<String, KvEntry> updatedTelemetry) {
log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", tenantId, entityId, calculatedFieldId);
CalculatedField calculatedField = calculatedFieldCache.getCalculatedField(calculatedFieldId);
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId, tbelInvokeService);
private void executeTelemetryUpdate(CalculatedFieldCtx cfCtx, EntityId entityId, List<CalculatedFieldId> previousCalculatedFieldIds, Map<String, KvEntry> updatedTelemetry) {
log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", cfCtx.getTenantId(), entityId, cfCtx.getCfId());
Map<String, ArgumentEntry> argumentValues = updatedTelemetry.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue())));
EntityId cfEntityId = calculatedField.getEntityId();
switch (cfEntityId.getEntityType()) {
case ASSET_PROFILE, DEVICE_PROFILE -> {
boolean isCommonEntity = calculatedField.getConfiguration().getReferencedEntities().contains(entityId);
if (isCommonEntity) {
calculatedFieldCache.getEntitiesByProfile(tenantId, cfEntityId).forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues, previousCalculatedFieldIds));
} else {
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, previousCalculatedFieldIds);
}
}
default ->
updateOrInitializeState(calculatedFieldCtx, cfEntityId, argumentValues, previousCalculatedFieldIds);
}
updateOrInitializeState(cfCtx, entityId, argumentValues, previousCalculatedFieldIds);
}
@Override
@ -533,8 +492,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
} else {
clusterService.pushMsgToRuleEngine(tpi, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder().setProfileEntityMsg(proto).build(), null);
}
} catch (Exception e) {
log.trace("Failed to process profile entity msg: [{}]", proto, e);
}
@ -616,11 +573,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
boolean allKeysPresent = argumentsMap.keySet().containsAll(calculatedFieldCtx.getArguments().keySet());
boolean requiresTsRollingUpdate = calculatedFieldCtx.getArguments().values().stream()
.anyMatch(argument -> ArgumentType.TS_ROLLING.equals(argument.getType()) && state.getArguments().get(argument.getKey()) == null);
.anyMatch(argument -> ArgumentType.TS_ROLLING.equals(argument.getRefEntityKey().getType()) && state.getArguments().get(argument.getRefEntityKey().getKey()) == null);
if (!allKeysPresent || requiresTsRollingUpdate) {
Map<String, Argument> missingArguments = calculatedFieldCtx.getArguments().entrySet().stream()
.filter(entry -> !argumentsMap.containsKey(entry.getKey()) || (ArgumentType.TS_ROLLING.equals(entry.getValue().getType()) && state.getArguments().get(entry.getKey()) == null))
.filter(entry -> !argumentsMap.containsKey(entry.getKey()) || (ArgumentType.TS_ROLLING.equals(entry.getValue().getRefEntityKey().getType()) && state.getArguments().get(entry.getKey()) == null))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentsMap::putAll)
@ -696,7 +653,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
private ListenableFuture<ArgumentEntry> fetchArgumentValue(TenantId tenantId, EntityId targetEntityId, Argument argument) {
EntityId argumentEntityId = argument.getEntityId();
EntityId argumentEntityId = argument.getRefEntityId();
EntityId entityId = isProfileEntity(argumentEntityId)
? targetEntityId
: argumentEntityId;
@ -704,17 +661,17 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
private ListenableFuture<ArgumentEntry> fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) {
return switch (argument.getType()) {
return switch (argument.getRefEntityKey().getType()) {
case TS_ROLLING -> fetchTsRolling(tenantId, entityId, argument);
case ATTRIBUTE -> transformSingleValueArgument(
Futures.transform(
attributesService.find(tenantId, entityId, argument.getScope(), argument.getKey()),
attributesService.find(tenantId, entityId, argument.getRefEntityKey().getScope(), argument.getRefEntityKey().getKey()),
result -> result.or(() -> Optional.of(new BaseAttributeKvEntry(createDefaultKvEntry(argument), System.currentTimeMillis(), 0L))),
calculatedFieldCallbackExecutor)
);
case TS_LATEST -> transformSingleValueArgument(
Futures.transform(
timeseriesService.findLatest(tenantId, entityId, argument.getKey()),
timeseriesService.findLatest(tenantId, entityId, argument.getRefEntityKey().getKey()),
result -> result.or(() -> Optional.of(new BasicTsKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument), 0L))),
calculatedFieldCallbackExecutor));
};
@ -736,7 +693,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
long startTs = currentTime - timeWindow;
int limit = argument.getLimit() == 0 ? MAX_LAST_RECORDS_VALUE : argument.getLimit();
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, currentTime, 0, limit, Aggregation.NONE);
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), startTs, currentTime, 0, limit, Aggregation.NONE);
ListenableFuture<List<TsKvEntry>> tsRollingFuture = timeseriesService.findAll(tenantId, entityId, List.of(query));
return Futures.transform(tsRollingFuture, tsRolling -> tsRolling == null ? TsRollingArgumentEntry.EMPTY : ArgumentEntry.createTsRollingArgument(tsRolling), calculatedFieldCallbackExecutor);
@ -826,7 +783,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
private KvEntry createDefaultKvEntry(Argument argument) {
String key = argument.getKey();
String key = argument.getRefEntityKey().getKey();
String defaultValue = argument.getDefaultValue();
if (NumberUtils.isParsable(defaultValue)) {
return new DoubleDataEntry(key, Double.parseDouble(defaultValue));

View File

@ -22,13 +22,16 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.util.TbPair;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Data
public class CalculatedFieldCtx {
@ -38,7 +41,8 @@ public class CalculatedFieldCtx {
private EntityId entityId;
private CalculatedFieldType cfType;
private final Map<String, Argument> arguments;
private final List<String> argKeys;
private final Map<TbPair<EntityId, ReferencedEntityKey>, String> referencedEntityKeys;
private final List<String> argNames;
private Output output;
private String expression;
private TbelInvokeService tbelInvokeService;
@ -51,7 +55,12 @@ public class CalculatedFieldCtx {
this.cfType = calculatedField.getType();
CalculatedFieldConfiguration configuration = calculatedField.getConfiguration();
this.arguments = configuration.getArguments();
this.argKeys = new ArrayList<>(arguments.keySet());
this.referencedEntityKeys = arguments.entrySet().stream()
.collect(Collectors.toMap(
entry -> new TbPair<>(entry.getValue().getRefEntityId(), entry.getValue().getRefEntityKey()),
Map.Entry::getKey
));
this.argNames = new ArrayList<>(arguments.keySet());
this.output = configuration.getOutput();
this.expression = configuration.getExpression();
this.tbelInvokeService = tbelInvokeService;
@ -69,7 +78,7 @@ public class CalculatedFieldCtx {
tenantId,
tbelInvokeService,
expression,
argKeys.toArray(String[]::new)
argNames.toArray(String[]::new)
);
}

View File

@ -49,7 +49,7 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
tsRecords.entrySet().removeIf(tsRecord -> tsRecord.getKey() < System.currentTimeMillis() - argument.getTimeWindow());
}
});
Object[] args = ctx.getArgKeys().stream()
Object[] args = ctx.getArgNames().stream()
.map(key -> arguments.get(key).getValue())
.toArray();
ListenableFuture<Map<String, Object>> resultFuture = ctx.getCalculatedFieldScriptEngine().executeToMapAsync(args);

View File

@ -20,11 +20,16 @@ import lombok.Data;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -55,4 +60,24 @@ public class CalculatedFieldAttributeUpdateRequest implements CalculatedFieldTel
};
}
@Override
public Map<String, KvEntry> getMappedTelemetry(CalculatedFieldCtx ctx) {
Map<String, KvEntry> mappedKvEntries = new HashMap<>();
Map<TbPair<EntityId, ReferencedEntityKey>, String> referencedKeys = ctx.getReferencedEntityKeys();
kvEntries.forEach(entry -> {
String key = entry.getKey();
ReferencedEntityKey referencedEntityKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope);
String argName = referencedKeys.get(new TbPair<>(entityId, referencedEntityKey));
if (argName != null) {
mappedKvEntries.put(argName, entry);
}
});
return mappedKvEntries;
}
}

View File

@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import java.util.List;
import java.util.Map;
@ -36,4 +37,6 @@ public interface CalculatedFieldTelemetryUpdateRequest {
Map<String, String> getTelemetryKeysFromLink(CalculatedFieldLinkConfiguration linkConfiguration);
Map<String, KvEntry> getMappedTelemetry(CalculatedFieldCtx ctx);
}

View File

@ -19,11 +19,16 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -48,4 +53,30 @@ public class CalculatedFieldTimeSeriesUpdateRequest implements CalculatedFieldTe
return linkConfiguration.getTimeSeries();
}
@Override
public Map<String, KvEntry> getMappedTelemetry(CalculatedFieldCtx ctx) {
Map<String, KvEntry> mappedKvEntries = new HashMap<>();
Map<TbPair<EntityId, ReferencedEntityKey>, String> referencedKeys = ctx.getReferencedEntityKeys();
kvEntries.forEach(entry -> {
String key = entry.getKey();
ReferencedEntityKey tsLatestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null);
String argTsLatestName = referencedKeys.get(new TbPair<>(entityId, tsLatestKey));
if (argTsLatestName != null) {
mappedKvEntries.put(argTsLatestName, entry);
} else {
ReferencedEntityKey tsRollingKey = new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null);
String argTsRollingName = referencedKeys.get(new TbPair<>(entityId, tsRollingKey));
if (argTsRollingName != null) {
mappedKvEntries.put(argTsRollingName, entry);
}
}
});
return mappedKvEntries;
}
}

View File

@ -28,6 +28,7 @@ import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@ -179,6 +180,10 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
forwardToRuleEngineActor(config.getName(), tenantId, toRuleEngineMsg, callback);
} else if (toRuleEngineMsg.hasCfTelemetryUpdateMsg()) {
calculatedFieldExecutionService.onTelemetryUpdateMsg(toRuleEngineMsg.getCfTelemetryUpdateMsg());
} else if (toRuleEngineMsg.hasEntityProfileUpdateMsg()) {
calculatedFieldExecutionService.onEntityProfileChangedMsg(toRuleEngineMsg.getEntityProfileUpdateMsg(), TbCallback.EMPTY);
} else if (toRuleEngineMsg.hasProfileEntityMsg()) {
calculatedFieldExecutionService.onProfileEntityMsg(toRuleEngineMsg.getProfileEntityMsg(), TbCallback.EMPTY);
} else {
callback.onSuccess();
}

View File

@ -143,7 +143,7 @@ public class CalculatedFieldControllerTest extends AbstractControllerTest {
Argument argument = new Argument();
argument.setEntityId(referencedEntityId);
argument.setType(ArgumentType.TS_LATEST);
argument.setKey("temperature");
argument.setRefEntityKey("temperature");
config.setArguments(Map.of("T", argument));

View File

@ -16,16 +16,15 @@
package org.thingsboard.server.common.data.cf.configuration;
import lombok.Data;
import org.thingsboard.server.common.data.AttributeScope;
import org.springframework.lang.Nullable;
import org.thingsboard.server.common.data.id.EntityId;
@Data
public class Argument {
private EntityId entityId;
private String key;
private ArgumentType type;
private AttributeScope scope;
@Nullable
private EntityId refEntityId;
private ReferencedEntityKey refEntityKey;
private String defaultValue;
private int limit;

View File

@ -50,6 +50,7 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel
}
public BaseCalculatedFieldConfiguration(JsonNode config, EntityType entityType, UUID entityId) {
// BaseCalculatedFieldConfiguration calculatedFieldConfig = mapper.convertValue(config, BaseCalculatedFieldConfiguration.class);
BaseCalculatedFieldConfiguration calculatedFieldConfig = toCalculatedFieldConfig(config, entityType, entityId);
this.arguments = calculatedFieldConfig.getArguments();
this.expression = calculatedFieldConfig.getExpression();
@ -59,7 +60,7 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel
@Override
public List<EntityId> getReferencedEntities() {
return arguments.values().stream()
.map(Argument::getEntityId)
.map(Argument::getRefEntityId)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
@ -69,24 +70,24 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel
CalculatedFieldLinkConfiguration linkConfiguration = new CalculatedFieldLinkConfiguration();
arguments.entrySet().stream()
.filter(entry -> entry.getValue().getEntityId().equals(entityId))
.filter(entry -> entry.getValue().getRefEntityId().equals(entityId))
.forEach(entry -> {
Argument targetArgument = entry.getValue();
String argumentKey = entry.getKey();
ReferencedEntityKey refEntityKey = entry.getValue().getRefEntityKey();
String argumentName = entry.getKey();
switch (targetArgument.getType()) {
switch (refEntityKey.getType()) {
case ATTRIBUTE -> {
switch (targetArgument.getScope()) {
switch (refEntityKey.getScope()) {
case CLIENT_SCOPE ->
linkConfiguration.getClientAttributes().put(targetArgument.getKey(), argumentKey);
linkConfiguration.getClientAttributes().put(refEntityKey.getKey(), argumentName);
case SERVER_SCOPE ->
linkConfiguration.getServerAttributes().put(targetArgument.getKey(), argumentKey);
linkConfiguration.getServerAttributes().put(refEntityKey.getKey(), argumentName);
case SHARED_SCOPE ->
linkConfiguration.getSharedAttributes().put(targetArgument.getKey(), argumentKey);
linkConfiguration.getSharedAttributes().put(refEntityKey.getKey(), argumentName);
}
}
case TS_LATEST, TS_ROLLING ->
linkConfiguration.getTimeSeries().put(targetArgument.getKey(), argumentKey);
linkConfiguration.getTimeSeries().put(refEntityKey.getKey(), argumentName);
}
});
@ -118,7 +119,7 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel
ObjectNode argumentsNode = configNode.putObject("arguments");
arguments.forEach((key, argument) -> {
ObjectNode argumentNode = argumentsNode.putObject(key);
EntityId referencedEntityId = argument.getEntityId();
EntityId referencedEntityId = argument.getRefEntityId();
if (referencedEntityId != null) {
argumentNode.put("entityType", referencedEntityId.getEntityType().name());
argumentNode.put("entityId", referencedEntityId.getId().toString());
@ -126,9 +127,9 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel
argumentNode.put("entityType", entityType.name());
argumentNode.put("entityId", entityId.toString());
}
argumentNode.put("key", argument.getKey());
argumentNode.put("type", String.valueOf(argument.getType()));
argumentNode.put("scope", String.valueOf(argument.getScope()));
// argumentNode.put("key", argument.getKey());
// argumentNode.put("type", String.valueOf(argument.getType()));
// argumentNode.put("scope", String.valueOf(argument.getScope()));
argumentNode.put("defaultValue", argument.getDefaultValue());
argumentNode.put("limit", String.valueOf(argument.getLimit()));
argumentNode.put("timeWindow", String.valueOf(argument.getTimeWindow()));
@ -165,19 +166,19 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel
if (argumentNode.hasNonNull("entityType") && argumentNode.hasNonNull("entityId")) {
String referencedEntityType = argumentNode.get("entityType").asText();
UUID referencedEntityId = UUID.fromString(argumentNode.get("entityId").asText());
argument.setEntityId(EntityIdFactory.getByTypeAndUuid(referencedEntityType, referencedEntityId));
argument.setRefEntityId(EntityIdFactory.getByTypeAndUuid(referencedEntityType, referencedEntityId));
} else {
argument.setEntityId(EntityIdFactory.getByTypeAndUuid(entityType, entityId));
argument.setRefEntityId(EntityIdFactory.getByTypeAndUuid(entityType, entityId));
}
argument.setKey(argumentNode.get("key").asText());
// argument.setRefEntityKey(argumentNode.get("key").asText());
JsonNode type = argumentNode.get("type");
if (type != null && !type.isNull() && !type.asText().equals("null")) {
argument.setType(ArgumentType.valueOf(type.asText()));
}
JsonNode scope = argumentNode.get("scope");
if (scope != null && !scope.isNull() && !scope.asText().equals("null")) {
argument.setScope(AttributeScope.valueOf(scope.asText()));
}
// if (type != null && !type.isNull() && !type.asText().equals("null")) {
// argument.setType(ArgumentType.valueOf(type.asText()));
// }
// JsonNode scope = argumentNode.get("scope");
// if (scope != null && !scope.isNull() && !scope.asText().equals("null")) {
// argument.setScope(AttributeScope.valueOf(scope.asText()));
// }
if (argumentNode.hasNonNull("defaultValue")) {
argument.setDefaultValue(argumentNode.get("defaultValue").asText());
}

View File

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.cf.configuration;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.common.data.AttributeScope;
@Data
@AllArgsConstructor
public class ReferencedEntityKey {
private String key;
private ArgumentType type;
private AttributeScope scope;
}

View File

@ -22,6 +22,7 @@ import jakarta.persistence.Entity;
import jakarta.persistence.Table;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
@ -95,6 +96,7 @@ public class CalculatedFieldEntity extends BaseSqlEntity<CalculatedField> implem
this.type = calculatedField.getType().name();
this.name = calculatedField.getName();
this.configurationVersion = calculatedField.getConfigurationVersion();
// this.configuration = JacksonUtil.valueToTree(calculatedField.getConfiguration());
this.configuration = calculatedField.getConfiguration().calculatedFieldConfigToJson(EntityType.valueOf(entityType), entityId);
this.version = calculatedField.getVersion();
if (calculatedField.getExternalId() != null) {
@ -112,6 +114,7 @@ public class CalculatedFieldEntity extends BaseSqlEntity<CalculatedField> implem
calculatedField.setName(name);
calculatedField.setConfigurationVersion(configurationVersion);
calculatedField.setConfiguration(readCalculatedFieldConfiguration(configuration, EntityType.valueOf(entityType), entityId));
// calculatedField.setConfiguration(JacksonUtil.treeToValue(configuration, CalculatedFieldConfiguration.class));
calculatedField.setVersion(version);
if (externalId != null) {
calculatedField.setExternalId(new CalculatedFieldId(externalId));

View File

@ -887,7 +887,7 @@ public class AssetServiceTest extends AbstractServiceTest {
Argument argument = new Argument();
argument.setEntityId(savedAsset.getId());
argument.setType(ArgumentType.TS_LATEST);
argument.setKey("temperature");
argument.setRefEntityKey("temperature");
config.setArguments(Map.of("T", argument));

View File

@ -156,7 +156,7 @@ public class CalculatedFieldServiceTest extends AbstractServiceTest {
Argument argument = new Argument();
argument.setEntityId(referencedEntityId);
argument.setType(ArgumentType.TS_LATEST);
argument.setKey("temperature");
argument.setRefEntityKey("temperature");
config.setArguments(Map.of("T", argument));

View File

@ -382,7 +382,7 @@ public class CustomerServiceTest extends AbstractServiceTest {
Argument argument = new Argument();
argument.setEntityId(savedCustomer.getId());
argument.setType(ArgumentType.TS_LATEST);
argument.setKey("temperature");
argument.setRefEntityKey("temperature");
config.setArguments(Map.of("T", argument));

View File

@ -1225,7 +1225,7 @@ public class DeviceServiceTest extends AbstractServiceTest {
Argument argument = new Argument();
argument.setEntityId(device.getId());
argument.setType(ArgumentType.TS_LATEST);
argument.setKey("temperature");
argument.setRefEntityKey("temperature");
config.setArguments(Map.of("T", argument));