From c1c9fb4f5aa9ee4fa5c369ba51436b194d908b1f Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 31 Jan 2025 11:29:38 +0200 Subject: [PATCH] implemented calculated field debug events persistence --- .../server/actors/ActorSystemContext.java | 52 +++++++++++-------- ...CalculatedFieldEntityMessageProcessor.java | 42 ++++++++++----- ...alculatedFieldManagerMessageProcessor.java | 20 +++---- ...efaultCalculatedFieldExecutionService.java | 15 ++++-- .../ctx/state/BaseCalculatedFieldState.java | 19 ++++--- .../cf/ctx/state/CalculatedFieldCtx.java | 42 +++++++++++++-- .../cf/ctx/state/CalculatedFieldState.java | 4 ++ .../ctx/state/ScriptCalculatedFieldState.java | 6 +++ .../ctx/state/SimpleCalculatedFieldState.java | 15 ++---- .../cf/DefaultTbCalculatedFieldService.java | 10 ++++ common/proto/src/main/proto/queue.proto | 3 ++ ...efaultNativeCalculatedFieldRepository.java | 3 ++ .../engine/api/AttributesSaveRequest.java | 18 ++++++- .../engine/api/TimeseriesSaveRequest.java | 18 ++++++- .../engine/telemetry/TbMsgAttributesNode.java | 2 + .../engine/telemetry/TbMsgTimeseriesNode.java | 2 + 16 files changed, 194 insertions(+), 77 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 09488bfe4e..daa6744fd9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -50,6 +50,7 @@ import org.thingsboard.server.common.data.event.RuleNodeDebugEvent; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsg; @@ -105,6 +106,7 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; +import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; @@ -129,6 +131,7 @@ import org.thingsboard.server.service.transport.TbCoreToTransportService; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; @@ -744,29 +747,34 @@ public class ActorSystemContext { } } - public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map arguments, TbMsg tbMsg, Throwable error) { - if (checkLimits(tenantId, tbMsg, error)) { - try { - CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder event = CalculatedFieldDebugEvent.builder() - .tenantId(tenantId) - .entityId(entityId.getId()) - .serviceId(getServiceId()) - .calculatedFieldId(calculatedFieldId) - .eventEntity(tbMsg.getOriginator()) - .msgId(tbMsg.getId()) - .msgType(tbMsg.getType()) - .arguments(JacksonUtil.toString(arguments)) - .result(tbMsg.getData()); - - if (error != null) { - event.error(toString(error)); - } - - ListenableFuture future = eventService.saveAsync(event.build()); - Futures.addCallback(future, CALCULATED_FIELD_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor()); - } catch (IllegalArgumentException ex) { - log.warn("Failed to persist calculated field debug message", ex); + public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, Throwable error) { + try { + CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder() + .tenantId(tenantId) + .entityId(entityId.getId()) + .serviceId(getServiceId()) + .calculatedFieldId(calculatedFieldId) + .eventEntity(entityId); + if (tbMsgId != null) { + eventBuilder.msgId(tbMsgId); } + if (tbMsgType != null) { + eventBuilder.msgType(tbMsgType.name()); + } + if (arguments != null) { + eventBuilder.arguments(JacksonUtil.toString(arguments)); + } + if (result != null) { + eventBuilder.result(result); + } + if (error != null) { + eventBuilder.error(toString(error)); + } + + ListenableFuture future = eventService.saveAsync(eventBuilder.build()); + Futures.addCallback(future, CALCULATED_FIELD_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor()); + } catch (IllegalArgumentException ex) { + log.warn("Failed to persist calculated field debug message", ex); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index cea3cce791..6426be8a3c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -18,19 +18,18 @@ package org.thingsboard.server.actors.calculatedField; import com.google.common.util.concurrent.ListenableFuture; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; +import org.thingsboard.common.util.DebugModeUtil; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; -import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.CalculatedFieldId; -import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageDataIterable; +import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; @@ -53,9 +52,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** @@ -111,9 +108,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM callback.onSuccess(CALLBACKS_PER_CF); } else { if (proto.getTsDataCount() > 0) { - processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList())); + processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); } else if (proto.getAttrDataCount() > 0) { - processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList())); + processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); } else { callback.onSuccess(CALLBACKS_PER_CF); } @@ -136,27 +133,30 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM @SneakyThrows private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) { - processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList())); + processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); } @SneakyThrows private void processAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) { - processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList())); + processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); } @SneakyThrows private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List cfIdList, MultipleTbCallback callback, - Map newArgValues) { + Map newArgValues, UUID tbMsgId, TbMsgType tbMsgType) { if (newArgValues.isEmpty()) { callback.onSuccess(CALLBACKS_PER_CF); } CalculatedFieldState state = getOrInitState(ctx); if (state.updateState(newArgValues)) { - if (state.isReady()) { + if (state.isReady() && ctx.isInitialized()) { CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS); cfIdList = new ArrayList<>(cfIdList); cfIdList.add(ctx.getCfId()); cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); + if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { + systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResultMap()), null); + } } else { callback.onSuccess(); // State was updated but no calculation performed; } @@ -183,13 +183,27 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return state; } + private UUID toTbMsgId(CalculatedFieldTelemetryMsgProto proto) { + if (proto.getTbMsgIdMSB() != 0 && proto.getTbMsgIdLSB() != 0) { + return new UUID(proto.getTbMsgIdMSB(), proto.getTbMsgIdLSB()); + } + return null; + } + + private TbMsgType toTbMsgType(CalculatedFieldTelemetryMsgProto proto) { + if (!proto.getTbMsgType().isEmpty()) { + return TbMsgType.valueOf(proto.getTbMsgType()); + } + return null; + } + private Map mapToArguments(CalculatedFieldCtx ctx, List data) { return mapToArguments(ctx.getMainEntityArguments(), data); } private Map mapToArguments(CalculatedFieldCtx ctx, EntityId entityId, List data) { var argNames = ctx.getLinkedEntityArguments().get(entityId); - if(argNames.isEmpty()) { + if (argNames.isEmpty()) { return Collections.emptyMap(); } return mapToArguments(argNames, data); @@ -221,7 +235,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM private Map mapToArguments(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List attrDataList) { var argNames = ctx.getLinkedEntityArguments().get(entityId); - if(argNames.isEmpty()) { + if (argNames.isEmpty()) { return Collections.emptyMap(); } return mapToArguments(argNames, scope, attrDataList); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 4bce7cb322..603ab96a8e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -16,16 +16,14 @@ package org.thingsboard.server.actors.calculatedField; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; +import org.thingsboard.common.util.DebugModeUtil; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; -import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; @@ -38,16 +36,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; -import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; -import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; -import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; -import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; -import org.thingsboard.server.queue.discovery.HashPartitionService; -import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; @@ -101,6 +90,13 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware public void onFieldInitMsg(CalculatedFieldInitMsg msg) { var cf = msg.getCf(); var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService()); + try { + cfCtx.init(); + } catch (Exception e) { + if (DebugModeUtil.isDebugAllAvailable(cf)) { + systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e); + } + } calculatedFields.put(cf.getId(), cfCtx); // We use copy on write lists to safely pass the reference to another actor for the iteration. // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index 9c8aafc47d..8f948aa265 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -833,7 +833,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); - CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds()); + CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); List entries = request.getEntries(); List versions = result.getVersions(); for (int i = 0; i < entries.size(); i++) { @@ -849,7 +849,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List versions) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); - CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds()); + CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name())); List entries = request.getEntries(); for (int i = 0; i < entries.size(); i++) { @@ -862,7 +862,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return msg.build(); } - private CalculatedFieldTelemetryMsgProto.Builder buildTelemetryMsgProto(TenantId tenantId, EntityId entityId, List calculatedFieldIds) { + private CalculatedFieldTelemetryMsgProto.Builder buildTelemetryMsgProto(TenantId tenantId, EntityId entityId, List calculatedFieldIds, UUID tbMsgId, TbMsgType tbMsgType) { CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = CalculatedFieldTelemetryMsgProto.newBuilder(); telemetryMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); @@ -878,6 +878,15 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } + if (tbMsgId != null) { + telemetryMsg.setTbMsgIdMSB(tbMsgId.getMostSignificantBits()); + telemetryMsg.setTbMsgIdLSB(tbMsgId.getLeastSignificantBits()); + } + + if (tbMsgType != null) { + telemetryMsg.setTbMsgType(tbMsgType.name()); + } + return telemetryMsg; } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index d623043cee..f5f681b505 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -15,19 +15,18 @@ */ package org.thingsboard.server.service.cf.ctx.state; +import lombok.NoArgsConstructor; + import java.util.HashMap; import java.util.List; import java.util.Map; +@NoArgsConstructor public abstract class BaseCalculatedFieldState implements CalculatedFieldState { protected List requiredArguments; protected Map arguments; - public BaseCalculatedFieldState() { - this.arguments = new HashMap<>(); - } - public BaseCalculatedFieldState(List requiredArguments) { this.requiredArguments = requiredArguments; this.arguments = new HashMap<>(); @@ -35,7 +34,12 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { @Override public Map getArguments() { - return this.arguments; + return arguments; + } + + @Override + public List getRequiredArguments() { + return requiredArguments; } @Override @@ -53,7 +57,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { if (existingEntry == null) { validateNewEntry(newEntry); - arguments.put(key, newEntry.copy()); + arguments.put(key, newEntry); stateUpdated = true; } else { stateUpdated = existingEntry.updateEntry(newEntry); @@ -70,7 +74,6 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { !arguments.containsValue(TsRollingArgumentEntry.EMPTY); } - protected void validateNewEntry(ArgumentEntry newEntry) { - } + protected abstract void validateNewEntry(ArgumentEntry newEntry); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 42536188ce..33421a7d10 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -17,6 +17,8 @@ package org.thingsboard.server.service.cf.ctx.state; import lombok.Data; import net.objecthunter.exp4j.Expression; +import net.objecthunter.exp4j.ExpressionBuilder; +import org.mvel2.MVEL; import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.cf.CalculatedField; @@ -33,7 +35,6 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; -import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import java.util.ArrayList; @@ -45,6 +46,8 @@ import java.util.stream.Collectors; @Data public class CalculatedFieldCtx { + private CalculatedField calculatedField; + private CalculatedFieldId cfId; private TenantId tenantId; private EntityId entityId; @@ -61,7 +64,11 @@ public class CalculatedFieldCtx { private CalculatedFieldScriptEngine calculatedFieldScriptEngine; private ThreadLocal customExpression; + private boolean initialized; + public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService) { + this.calculatedField = calculatedField; + this.cfId = calculatedField.getId(); this.tenantId = calculatedField.getTenantId(); this.entityId = calculatedField.getEntityId(); @@ -88,10 +95,28 @@ public class CalculatedFieldCtx { this.output = configuration.getOutput(); this.expression = configuration.getExpression(); this.tbelInvokeService = tbelInvokeService; - if (CalculatedFieldType.SCRIPT.equals(calculatedField.getType())) { - this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService); + } + + public void init() { + if (CalculatedFieldType.SCRIPT.equals(cfType)) { + try { + this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService); + initialized = true; + } catch (Exception e) { + throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.", e); + } } else { - this.customExpression = new ThreadLocal<>(); + if (isValidExpression(expression)) { + this.customExpression = ThreadLocal.withInitial(() -> + new ExpressionBuilder(expression) + .implicitMultiplication(true) + .variables(this.arguments.keySet()) + .build() + ); + initialized = true; + } else { + throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax."); + } } } @@ -108,6 +133,15 @@ public class CalculatedFieldCtx { ); } + private boolean isValidExpression(String expression) { + try { + MVEL.compileExpression(expression); + return true; + } catch (Exception e) { + return false; + } + } + public boolean matches(List values, AttributeScope scope) { return matchesAttributes(mainEntityArguments, values, scope); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java index 4b7918cc03..173d299da6 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.service.cf.CalculatedFieldResult; +import java.util.List; import java.util.Map; @JsonTypeInfo( @@ -40,9 +41,12 @@ public interface CalculatedFieldState { Map getArguments(); + List getRequiredArguments(); + boolean updateState(Map argumentValues); ListenableFuture performCalculation(CalculatedFieldCtx ctx); + @JsonIgnore boolean isReady(); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java index 290fd95370..a3c7efb388 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; @@ -31,6 +32,7 @@ import java.util.TreeMap; @Data @Slf4j +@NoArgsConstructor public class ScriptCalculatedFieldState extends BaseCalculatedFieldState { public ScriptCalculatedFieldState(List requiredArguments) { @@ -42,6 +44,10 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState { return CalculatedFieldType.SCRIPT; } + @Override + protected void validateNewEntry(ArgumentEntry newEntry) { + } + @Override public ListenableFuture performCalculation(CalculatedFieldCtx ctx) { arguments.forEach((key, argumentEntry) -> { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index 59b3009bb4..8b8fe6e8c7 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -18,8 +18,7 @@ package org.thingsboard.server.service.cf.ctx.state; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.Data; -import net.objecthunter.exp4j.Expression; -import net.objecthunter.exp4j.ExpressionBuilder; +import lombok.NoArgsConstructor; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Output; import org.thingsboard.server.service.cf.CalculatedFieldResult; @@ -28,6 +27,7 @@ import java.util.List; import java.util.Map; @Data +@NoArgsConstructor public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { public SimpleCalculatedFieldState(List requiredArguments) { @@ -48,16 +48,7 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { @Override public ListenableFuture performCalculation(CalculatedFieldCtx ctx) { - String expression = ctx.getExpression(); - ThreadLocal customExpression = ctx.getCustomExpression(); - var expr = customExpression.get(); - if (expr == null) { - expr = new ExpressionBuilder(expression) - .implicitMultiplication(true) - .variables(this.arguments.keySet()) - .build(); - customExpression.set(expr); - } + var expr = ctx.getCustomExpression().get(); for (Map.Entry entry : this.arguments.entrySet()) { try { diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/cf/DefaultTbCalculatedFieldService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/cf/DefaultTbCalculatedFieldService.java index c8c589691b..184092720c 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/cf/DefaultTbCalculatedFieldService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/cf/DefaultTbCalculatedFieldService.java @@ -58,6 +58,10 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp ActionType actionType = calculatedField.getId() == null ? ActionType.ADDED : ActionType.UPDATED; TenantId tenantId = calculatedField.getTenantId(); try { + if (ActionType.UPDATED.equals(actionType)) { + CalculatedField existingCf = calculatedFieldService.findById(tenantId, calculatedField.getId()); + checkForEntityChange(existingCf, calculatedField); + } checkCalculatedFieldNumber(tenantId, calculatedField.getEntityId()); checkEntityExistence(tenantId, calculatedField.getEntityId()); checkArgumentSize(calculatedField.getConfiguration()); @@ -98,6 +102,12 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp } } + private void checkForEntityChange(CalculatedField oldCalculatedField, CalculatedField newCalculatedField) { + if (!oldCalculatedField.getEntityId().equals(newCalculatedField.getEntityId())) { + throw new IllegalArgumentException("Changing the calculated field target entity after initialization is prohibited."); + } + } + private void checkEntityExistence(TenantId tenantId, EntityId entityId) { switch (entityId.getEntityType()) { case ASSET, DEVICE, ASSET_PROFILE, DEVICE_PROFILE -> diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 69b912120e..53045fc147 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -799,6 +799,9 @@ message CalculatedFieldTelemetryMsgProto { repeated TsKvProto tsData = 9; AttributeScopeProto scope = 10; repeated AttributeValueProto attrData = 11; + int64 tbMsgIdMSB = 12; + int64 tbMsgIdLSB = 13; + string tbMsgType = 14; } message CalculatedFieldLinkedTelemetryMsgProto { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/DefaultNativeCalculatedFieldRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/DefaultNativeCalculatedFieldRepository.java index fcbf4d4dd0..daca6d5056 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/DefaultNativeCalculatedFieldRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/DefaultNativeCalculatedFieldRepository.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; +import org.thingsboard.server.common.data.debug.DebugSettings; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CalculatedFieldLinkId; import org.thingsboard.server.common.data.id.EntityIdFactory; @@ -78,6 +79,7 @@ public class DefaultNativeCalculatedFieldRepository implements NativeCalculatedF int configurationVersion = (int) row.get("configuration_version"); JsonNode configuration = JacksonUtil.toJsonNode((String) row.get("configuration")); long version = row.get("version") != null ? (long) row.get("version") : 0; + String debugSettings = (String) row.get("debug_settings"); Object externalIdObj = row.get("external_id"); CalculatedField calculatedField = new CalculatedField(); @@ -90,6 +92,7 @@ public class DefaultNativeCalculatedFieldRepository implements NativeCalculatedF calculatedField.setConfigurationVersion(configurationVersion); calculatedField.setConfiguration(JacksonUtil.treeToValue(configuration, CalculatedFieldConfiguration.class)); calculatedField.setVersion(version); + calculatedField.setDebugSettings(JacksonUtil.fromString(debugSettings, DebugSettings.class)); calculatedField.setExternalId(externalIdObj != null ? new CalculatedFieldId(UUID.fromString((String) externalIdObj)) : null); return calculatedField; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java index 406b2d0d5c..74f3468f49 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java @@ -28,8 +28,10 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.msg.TbMsgType; import java.util.List; +import java.util.UUID; @Getter @ToString @@ -42,6 +44,8 @@ public class AttributesSaveRequest { private final List entries; private final boolean notifyDevice; private final List previousCalculatedFieldIds; + private final UUID tbMsgId; + private final TbMsgType tbMsgType; private final FutureCallback callback; public static Builder builder() { @@ -56,6 +60,8 @@ public class AttributesSaveRequest { private List entries; private boolean notifyDevice = true; private List previousCalculatedFieldIds; + private UUID tbMsgId; + private TbMsgType tbMsgType; private FutureCallback callback; Builder() {} @@ -108,6 +114,16 @@ public class AttributesSaveRequest { return this; } + public Builder tbMsgId(UUID tbMsgId) { + this.tbMsgId = tbMsgId; + return this; + } + + public Builder tbMsgType(TbMsgType tbMsgType) { + this.tbMsgType = tbMsgType; + return this; + } + public Builder callback(FutureCallback callback) { this.callback = callback; return this; @@ -128,7 +144,7 @@ public class AttributesSaveRequest { } public AttributesSaveRequest build() { - return new AttributesSaveRequest(tenantId, entityId, scope, entries, notifyDevice, previousCalculatedFieldIds, callback); + return new AttributesSaveRequest(tenantId, entityId, scope, entries, notifyDevice, previousCalculatedFieldIds, tbMsgId, tbMsgType, callback); } } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java index 95eb788e5f..957b0cf108 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java @@ -27,8 +27,10 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.msg.TbMsgType; import java.util.List; +import java.util.UUID; @Getter @AllArgsConstructor(access = AccessLevel.PRIVATE) @@ -42,6 +44,8 @@ public class TimeseriesSaveRequest { private final boolean saveLatest; private final boolean onlyLatest; private final List previousCalculatedFieldIds; + private final UUID tbMsgId; + private final TbMsgType tbMsgType; private final FutureCallback callback; public static Builder builder() { @@ -59,6 +63,8 @@ public class TimeseriesSaveRequest { private boolean saveLatest = true; private boolean onlyLatest; private List previousCalculatedFieldIds; + private UUID tbMsgId; + private TbMsgType tbMsgType; Builder() {} @@ -111,6 +117,16 @@ public class TimeseriesSaveRequest { return this; } + public Builder tbMsgId(UUID tbMsgId) { + this.tbMsgId = tbMsgId; + return this; + } + + public Builder tbMsgType(TbMsgType tbMsgType) { + this.tbMsgType = tbMsgType; + return this; + } + public Builder callback(FutureCallback callback) { this.callback = callback; return this; @@ -131,7 +147,7 @@ public class TimeseriesSaveRequest { } public TimeseriesSaveRequest build() { - return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, previousCalculatedFieldIds, callback); + return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, previousCalculatedFieldIds, tbMsgId, tbMsgType, callback); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index ae2fce6575..925d70daa1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -126,6 +126,8 @@ public class TbMsgAttributesNode implements TbNode { .entries(attributes) .notifyDevice(config.isNotifyDevice() || checkNotifyDeviceMdValue(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY))) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) + .tbMsgId(msg.getId()) + .tbMsgType(msg.getInternalType()) .callback(callback) .build()); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 89f7844313..3287cc825f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -113,6 +113,8 @@ public class TbMsgTimeseriesNode implements TbNode { .ttl(ttl) .saveLatest(!config.isSkipLatestPersistence()) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) + .tbMsgId(msg.getId()) + .tbMsgType(msg.getInternalType()) .callback(new TelemetryNodeCallback(ctx, msg)) .build()); }