Added relation creation support

This commit is contained in:
dshvaika 2025-08-08 19:48:29 +03:00
parent 409328dbe3
commit 3643b54985
12 changed files with 135 additions and 65 deletions

View File

@ -321,33 +321,17 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
boolean stateSizeChecked = false;
try {
if (ctx.isInitialized() && state.isReady()) {
List<CalculatedFieldResult> calculationResults = state.performCalculation(ctx).get(systemContext.getCfCalculationResultTimeout(), TimeUnit.SECONDS);
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(systemContext.getCfCalculationResultTimeout(), TimeUnit.SECONDS);
state.checkStateSize(ctxId, ctx.getMaxStateSize());
stateSizeChecked = true;
if (state.isSizeOk()) {
if (calculationResults.isEmpty()) {
if (!calculationResult.isEmpty()) {
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
} else {
callback.onSuccess();
} else {
TbCallback effectiveCallback = calculationResults.size() > 1 ?
new MultipleTbCallback(calculationResults.size(), callback) : callback;
for (CalculatedFieldResult calculationResult : calculationResults) {
if (calculationResult.isEmpty()) {
effectiveCallback.onSuccess();
} else {
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, effectiveCallback);
}
}
}
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
if (calculationResults.isEmpty()) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId,
state.getArguments(), tbMsgId, tbMsgType, null, null);
} else {
for (CalculatedFieldResult calculationResult : calculationResults) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId,
state.getArguments(), tbMsgId, tbMsgType, calculationResult.getResultAsString(), null);
}
}
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, calculationResult.getResult().toString(), null);
}
}
} else {

View File

@ -136,7 +136,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
public void onFieldInitMsg(CalculatedFieldInitMsg msg) throws CalculatedFieldException {
log.debug("[{}] Processing CF init message.", msg.getCf().getId());
var cf = msg.getCf();
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
var cfCtx = getCfCtx(cf);
try {
cfCtx.init();
} catch (Exception e) {
@ -297,7 +297,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
log.debug("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
callback.onSuccess();
} else {
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
var cfCtx = getCfCtx(cf);
try {
cfCtx.init();
} catch (Exception e) {
@ -313,6 +313,10 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
}
private CalculatedFieldCtx getCfCtx(CalculatedField cf) {
return new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService(), systemContext.getRelationService());
}
private void onCfUpdated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException {
var cfId = new CalculatedFieldId(msg.getEntityId().getId());
var oldCfCtx = calculatedFields.get(cfId);
@ -324,7 +328,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
log.debug("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
callback.onSuccess();
} else {
var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
var newCfCtx = getCfCtx(newCf);
try {
newCfCtx.init();
} catch (Exception e) {

View File

@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
@ -56,6 +57,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
private final CalculatedFieldService calculatedFieldService;
private final TbelInvokeService tbelInvokeService;
private final ApiLimitService apiLimitService;
private final RelationService relationService;
@Lazy
private final ActorSystemContext actorSystemContext;
@ -119,7 +121,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
if (ctx == null) {
CalculatedField calculatedField = getCalculatedField(calculatedFieldId);
if (calculatedField != null) {
ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService, apiLimitService);
ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService, apiLimitService, relationService);
calculatedFieldsCtx.put(calculatedFieldId, ctx);
log.debug("[{}] Put calculated field ctx into cache: {}", calculatedFieldId, ctx);
}

View File

@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
@ -68,13 +69,15 @@ public class CalculatedFieldCtx {
private CalculatedFieldScriptEngine calculatedFieldScriptEngine;
private ThreadLocal<Expression> customExpression;
private RelationService relationService;
private boolean initialized;
private long maxDataPointsPerRollingArg;
private long maxStateSize;
private long maxSingleValueArgumentSize;
public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService, ApiLimitService apiLimitService) {
public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService, ApiLimitService apiLimitService, RelationService relationService) {
this.calculatedField = calculatedField;
this.cfId = calculatedField.getId();
@ -102,6 +105,7 @@ public class CalculatedFieldCtx {
this.expression = configuration.getExpression();
this.useLatestTs = CalculatedFieldType.SIMPLE.equals(calculatedField.getType()) && ((SimpleCalculatedFieldConfiguration) configuration).isUseLatestTs();
this.tbelInvokeService = tbelInvokeService;
this.relationService = relationService;
this.maxDataPointsPerRollingArg = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxDataPointsPerRollingArg);
this.maxStateSize = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxStateSizeInKBytes) * 1024;

View File

@ -55,7 +55,7 @@ public interface CalculatedFieldState {
boolean updateState(CalculatedFieldCtx ctx, Map<String, ArgumentEntry> argumentValues);
ListenableFuture<List<CalculatedFieldResult>> performCalculation(CalculatedFieldCtx ctx);
ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx);
@JsonIgnore
boolean isReady();

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.cf.ctx.state;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.common.util.JacksonUtil;
@ -25,13 +26,15 @@ import org.thingsboard.common.util.geo.Coordinates;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.GeofencingCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.GeofencingEvent;
import org.thingsboard.server.common.data.cf.configuration.GeofencingZoneGroupConfiguration;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.utils.CalculatedFieldUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -112,33 +115,93 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState {
return stateUpdated;
}
// TODO: Probably returning list of CalculatedFieldResult no needed anymore,
// since logic changed to use zone groups with telemetry prefix.
@Override
public ListenableFuture<List<CalculatedFieldResult>> performCalculation(CalculatedFieldCtx ctx) {
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) {
double latitude = (double) arguments.get(ENTITY_ID_LATITUDE_ARGUMENT_KEY).getValue();
double longitude = (double) arguments.get(ENTITY_ID_LONGITUDE_ARGUMENT_KEY).getValue();
Coordinates entityCoordinates = new Coordinates(latitude, longitude);
var configuration = (GeofencingCalculatedFieldConfiguration) ctx.getCalculatedField().getConfiguration();
Map<String, GeofencingZoneGroupConfiguration> geofencingZoneGroupConfigurations = configuration.getGeofencingZoneGroupConfigurations();
if (configuration.isTrackRelationToZones()) {
// TODO: currently creates relation to device profile if CF created for profile)
return calculateWithRelations(ctx, entityCoordinates, configuration);
}
return calculateWithoutRelations(ctx, entityCoordinates, configuration);
}
private ListenableFuture<CalculatedFieldResult> calculateWithRelations(
CalculatedFieldCtx ctx,
Coordinates entityCoordinates,
GeofencingCalculatedFieldConfiguration configuration) {
var geofencingZoneGroupConfigurations = configuration.getGeofencingZoneGroupConfigurations();
Map<EntityId, GeofencingEvent> zoneEventMap = new HashMap<>();
ObjectNode resultNode = JacksonUtil.newObjectNode();
getGeofencingArguments().forEach((argumentKey, argumentEntry) -> {
var zoneGroupConfig = geofencingZoneGroupConfigurations.get(argumentKey);
Set<GeofencingEvent> zoneEvents = argumentEntry.getZoneStates()
.values()
.stream()
.map(zoneState -> zoneState.evaluate(entityCoordinates))
.collect(Collectors.toSet());
aggregateZoneGroupEvent(zoneEvents)
.filter(geofencingEvent -> zoneGroupConfig.getReportEvents().contains(geofencingEvent))
.ifPresent(event ->
resultNode.put(zoneGroupConfig.getReportTelemetryPrefix() + "Event", event.name())
);
Set<GeofencingEvent> groupEvents = new HashSet<>();
argumentEntry.getZoneStates().forEach((zoneId, zoneState) -> {
GeofencingEvent event = zoneState.evaluate(entityCoordinates);
zoneEventMap.put(zoneId, event);
groupEvents.add(event);
});
return Futures.immediateFuture(List.of(new CalculatedFieldResult(ctx.getOutput().getType(), ctx.getOutput().getScope(), resultNode)));
aggregateZoneGroupEvent(groupEvents)
.filter(zoneGroupConfig.getReportEvents()::contains)
.ifPresent(geofencingGroupEvent ->
resultNode.put(zoneGroupConfig.getReportTelemetryPrefix() + "Event", geofencingGroupEvent.name()));
});
var result = calculationResult(ctx, resultNode);
List<ListenableFuture<Boolean>> relationFutures = zoneEventMap.entrySet().stream()
.filter(entry -> entry.getValue().isTransitionEvent())
.map(entry -> {
EntityRelation relation = toRelation(entry.getKey(), ctx, configuration);
return switch (entry.getValue()) {
case ENTERED -> ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), relation);
case LEFT -> ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), relation);
default -> throw new IllegalStateException("Unexpected transition event: " + entry.getValue());
};
})
.toList();
if (relationFutures.isEmpty()) {
return Futures.immediateFuture(result);
}
return Futures.whenAllComplete(relationFutures).call(() ->
new CalculatedFieldResult(ctx.getOutput().getType(), ctx.getOutput().getScope(), resultNode),
MoreExecutors.directExecutor());
}
private ListenableFuture<CalculatedFieldResult> calculateWithoutRelations(
CalculatedFieldCtx ctx,
Coordinates entityCoordinates,
GeofencingCalculatedFieldConfiguration configuration) {
var geofencingZoneGroupConfigurations = configuration.getGeofencingZoneGroupConfigurations();
ObjectNode resultNode = JacksonUtil.newObjectNode();
getGeofencingArguments().forEach((argumentKey, argumentEntry) -> {
var zoneGroupConfig = geofencingZoneGroupConfigurations.get(argumentKey);
Set<GeofencingEvent> groupEvents = argumentEntry.getZoneStates().values().stream()
.map(zs -> zs.evaluate(entityCoordinates))
.collect(Collectors.toSet());
aggregateZoneGroupEvent(groupEvents)
.filter(zoneGroupConfig.getReportEvents()::contains)
.ifPresent(e -> resultNode.put(
zoneGroupConfig.getReportTelemetryPrefix() + "Event",
e.name()));
});
return Futures.immediateFuture(calculationResult(ctx, resultNode));
}
private CalculatedFieldResult calculationResult(CalculatedFieldCtx ctx, ObjectNode resultNode) {
return new CalculatedFieldResult(ctx.getOutput().getType(), ctx.getOutput().getScope(), resultNode);
}
@Override
@ -196,4 +259,11 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState {
return Optional.empty();
}
private EntityRelation toRelation(EntityId zoneId, CalculatedFieldCtx ctx, GeofencingCalculatedFieldConfiguration configuration) {
return switch (configuration.getZoneRelationDirection()) {
case TO -> new EntityRelation(zoneId, ctx.getEntityId(), configuration.getZoneRelationType());
case FROM -> new EntityRelation(ctx.getEntityId(), zoneId, configuration.getZoneRelationType());
};
}
}

View File

@ -53,7 +53,7 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
}
@Override
public ListenableFuture<List<CalculatedFieldResult>> performCalculation(CalculatedFieldCtx ctx) {
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) {
Map<String, TbelCfArg> arguments = new LinkedHashMap<>();
List<Object> args = new ArrayList<>(ctx.getArgNames().size() + 1);
args.add(new Object()); // first element is a ctx, but we will set it later;
@ -70,7 +70,7 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
ListenableFuture<JsonNode> resultFuture = ctx.getCalculatedFieldScriptEngine().executeJsonAsync(args.toArray());
Output output = ctx.getOutput();
return Futures.transform(resultFuture,
result -> List.of(new CalculatedFieldResult(output.getType(), output.getScope(), result)),
result -> new CalculatedFieldResult(output.getType(), output.getScope(), result),
MoreExecutors.directExecutor()
);
}

View File

@ -52,7 +52,7 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
}
@Override
public ListenableFuture<List<CalculatedFieldResult>> performCalculation(CalculatedFieldCtx ctx) {
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) {
var expr = ctx.getCustomExpression().get();
for (Map.Entry<String, ArgumentEntry> entry : this.arguments.entrySet()) {
@ -76,7 +76,7 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
Object result = formatResult(expressionResult, output.getDecimalsByDefault());
JsonNode outputResult = createResultJson(ctx.isUseLatestTs(), output.getName(), result);
return Futures.immediateFuture(List.of(new CalculatedFieldResult(output.getType(), output.getScope(), outputResult)));
return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), outputResult));
}
private Object formatResult(double expressionResult, Integer decimals) {

View File

@ -78,7 +78,7 @@ public class ScriptCalculatedFieldStateTest {
@BeforeEach
void setUp() {
when(apiLimitService.getLimit(any(), any())).thenReturn(1000L);
ctx = new CalculatedFieldCtx(getCalculatedField(), tbelInvokeService, apiLimitService);
ctx = new CalculatedFieldCtx(getCalculatedField(), tbelInvokeService, apiLimitService, null);
ctx.init();
state = new ScriptCalculatedFieldState(ctx.getArgNames());
}
@ -125,10 +125,9 @@ public class ScriptCalculatedFieldStateTest {
void testPerformCalculation() throws ExecutionException, InterruptedException {
state.arguments = new HashMap<>(Map.of("deviceTemperature", deviceTemperatureArgEntry, "assetHumidity", assetHumidityArgEntry));
List<CalculatedFieldResult> resultList = state.performCalculation(ctx).get();
CalculatedFieldResult result = state.performCalculation(ctx).get();
assertThat(resultList).isNotNull().hasSize(1);
CalculatedFieldResult result = resultList.get(0);
assertThat(result).isNotNull();
Output output = getCalculatedFieldConfig().getOutput();
assertThat(result.getType()).isEqualTo(output.getType());
assertThat(result.getScope()).isEqualTo(output.getScope());

View File

@ -42,7 +42,6 @@ import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@ -72,7 +71,7 @@ public class SimpleCalculatedFieldStateTest {
@BeforeEach
void setUp() {
when(apiLimitService.getLimit(any(), any())).thenReturn(1000L);
ctx = new CalculatedFieldCtx(getCalculatedField(), null, apiLimitService);
ctx = new CalculatedFieldCtx(getCalculatedField(), null, apiLimitService, null);
ctx.init();
state = new SimpleCalculatedFieldState(ctx.getArgNames());
}
@ -135,10 +134,9 @@ public class SimpleCalculatedFieldStateTest {
"key3", key3ArgEntry
));
List<CalculatedFieldResult> resultList = state.performCalculation(ctx).get();
CalculatedFieldResult result = state.performCalculation(ctx).get();
assertThat(resultList).isNotNull().hasSize(1);
CalculatedFieldResult result = resultList.get(0);
assertThat(result).isNotNull();
Output output = getCalculatedFieldConfig().getOutput();
assertThat(result.getType()).isEqualTo(output.getType());
assertThat(result.getScope()).isEqualTo(output.getScope());
@ -166,10 +164,9 @@ public class SimpleCalculatedFieldStateTest {
"key3", key3ArgEntry
));
List<CalculatedFieldResult> resultList = state.performCalculation(ctx).get();
CalculatedFieldResult result = state.performCalculation(ctx).get();
assertThat(resultList).isNotNull().hasSize(1);
CalculatedFieldResult result = resultList.get(0);
assertThat(result).isNotNull();
Output output = getCalculatedFieldConfig().getOutput();
assertThat(result.getType()).isEqualTo(output.getType());
assertThat(result.getScope()).isEqualTo(output.getScope());
@ -188,10 +185,9 @@ public class SimpleCalculatedFieldStateTest {
output.setDecimalsByDefault(3);
ctx.setOutput(output);
List<CalculatedFieldResult> resultList = state.performCalculation(ctx).get();
CalculatedFieldResult result = state.performCalculation(ctx).get();
assertThat(resultList).isNotNull().hasSize(1);
CalculatedFieldResult result = resultList.get(0);
assertThat(result).isNotNull();
assertThat(result.getType()).isEqualTo(output.getType());
assertThat(result.getScope()).isEqualTo(output.getScope());
assertThat(result.getResult()).isEqualTo(JacksonUtil.valueToTree(Map.of("output", 49.546)));

View File

@ -19,6 +19,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import java.util.HashSet;
@ -40,8 +41,9 @@ public class GeofencingCalculatedFieldConfiguration extends BaseCalculatedFieldC
ENTITY_ID_LONGITUDE_ARGUMENT_KEY
);
private boolean trackRelationToZones;
private String zoneRelationType;
private boolean trackZoneRelations;
private EntitySearchDirection zoneRelationDirection;
private Map<String, GeofencingZoneGroupConfiguration> geofencingZoneGroupConfigurations;
@Override
@ -50,6 +52,7 @@ public class GeofencingCalculatedFieldConfiguration extends BaseCalculatedFieldC
}
// TODO: update validate method in PE version.
// Add relation tracking configuration validation
@Override
public void validate() {
if (arguments == null) {

View File

@ -15,8 +15,16 @@
*/
package org.thingsboard.server.common.data.cf.configuration;
import lombok.Getter;
@Getter
public enum GeofencingEvent {
ENTERED, LEFT, INSIDE, OUTSIDE;
ENTERED(true), LEFT(true), INSIDE(false), OUTSIDE(false);
private final boolean transitionEvent;
GeofencingEvent(boolean transitionEvent) {
this.transitionEvent = transitionEvent;
}
}