diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index 966b5d7277..ee9b6d115c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.cf; +import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.KvEntry; @@ -27,7 +28,7 @@ public interface CalculatedFieldExecutionService { void onCalculatedFieldMsg(TransportProtos.CalculatedFieldMsgProto proto, TbCallback callback); - void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List telemetry); + void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List calculatedFieldIds, List telemetry); void onCalculatedFieldStateMsg(TransportProtos.CalculatedFieldStateMsgProto proto, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index 1128a75008..e5c119bba6 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -18,11 +18,7 @@ package org.thingsboard.server.service.cf; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.*; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Getter; @@ -41,25 +37,8 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; -import org.thingsboard.server.common.data.id.AssetId; -import org.thingsboard.server.common.data.id.AssetProfileId; -import org.thingsboard.server.common.data.id.CalculatedFieldId; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.DeviceProfileId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityIdFactory; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.Aggregation; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; -import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; -import org.thingsboard.server.common.data.kv.BasicTsKvEntry; -import org.thingsboard.server.common.data.kv.BooleanDataEntry; -import org.thingsboard.server.common.data.kv.DoubleDataEntry; -import org.thingsboard.server.common.data.kv.KvEntry; -import org.thingsboard.server.common.data.kv.ReadTsKvQuery; -import org.thingsboard.server.common.data.kv.StringDataEntry; -import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.id.*; +import org.thingsboard.server.common.data.kv.*; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.msg.TbMsg; @@ -301,7 +280,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } @Override - public void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List telemetry) { + public void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List calculatedFieldIds, List telemetry) { try { EntityType entityType = entityId.getEntityType(); if (EntityType.DEVICE.equals(entityType) || EntityType.ASSET.equals(entityType) || EntityType.CUSTOMER.equals(entityType) || EntityType.TENANT.equals(entityType)) { @@ -326,7 +305,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas )); if (!updatedTelemetry.isEmpty()) { - executeTelemetryUpdate(tenantId, entityId, calculatedFieldId, updatedTelemetry); + executeTelemetryUpdate(tenantId, entityId, calculatedFieldId, calculatedFieldIds, updatedTelemetry); } }); } @@ -335,7 +314,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } - private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, Map updatedTelemetry) { + private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List calculatedFieldIds, Map updatedTelemetry) { log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", tenantId, entityId, calculatedFieldId); CalculatedField calculatedField = getOrFetchFromDb(tenantId, calculatedFieldId); CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> new CalculatedFieldCtx(calculatedField, tbelInvokeService)); @@ -347,12 +326,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas case ASSET_PROFILE, DEVICE_PROFILE -> { boolean isCommonEntity = calculatedField.getConfiguration().getReferencedEntities().contains(entityId); if (isCommonEntity) { - getOrFetchFromDBProfileEntities(tenantId, cfEntityId).forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues)); + getOrFetchFromDBProfileEntities(tenantId, cfEntityId).forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues, calculatedFieldIds)); } else { - updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues); + updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, calculatedFieldIds); } } - default -> updateOrInitializeState(calculatedFieldCtx, cfEntityId, argumentValues); + default -> updateOrInitializeState(calculatedFieldCtx, cfEntityId, argumentValues, calculatedFieldIds); } log.info("Successfully updated telemetry for calculatedFieldId: [{}]", calculatedFieldId); } @@ -583,7 +562,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { @Override public void onSuccess(List results) { - updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues); + updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, Collections.emptyList()); callback.onSuccess(); } @@ -671,7 +650,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return new StringDataEntry(key, defaultValue); } - private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map argumentValues) { + private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map argumentValues, List calculatedFieldIds) { CalculatedFieldId cfId = calculatedFieldCtx.getCfId(); CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, ctxId -> fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType())); @@ -693,7 +672,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } if (allArgsPresent.test(state.getArguments())) { - performCalculation(calculatedFieldCtx, state, entityId); + performCalculation(calculatedFieldCtx, state, entityId, calculatedFieldIds); } } }; @@ -714,13 +693,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas performUpdateState.accept(state); } - private void performCalculation(CalculatedFieldCtx calculatedFieldCtx, CalculatedFieldState state, EntityId entityId) { + private void performCalculation(CalculatedFieldCtx calculatedFieldCtx, CalculatedFieldState state, EntityId entityId, List calculatedFieldIds) { ListenableFuture resultFuture = state.performCalculation(calculatedFieldCtx); Futures.addCallback(resultFuture, new FutureCallback<>() { @Override public void onSuccess(CalculatedFieldResult result) { if (result != null) { - pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), entityId, result); + pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), calculatedFieldCtx.getCfId(), entityId, result, calculatedFieldIds); } } @@ -739,13 +718,17 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class); } - private void pushMsgToRuleEngine(TenantId tenantId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult) { + private void pushMsgToRuleEngine(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult, List calculatedFieldIds) { try { String type = calculatedFieldResult.getType(); TbMsgType msgType = "ATTRIBUTES".equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; TbMsgMetaData md = "ATTRIBUTES".equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; ObjectNode payload = createJsonPayload(calculatedFieldResult); - TbMsg msg = TbMsg.newMsg(msgType, originatorId, md, JacksonUtil.writeValueAsString(payload)); + if (calculatedFieldIds.contains(calculatedFieldId)) { + throw new IllegalArgumentException("Calculated field [" + calculatedFieldId.getId() + "] refers to itself, causing an infinite loop."); + } + calculatedFieldIds.add(calculatedFieldId); + TbMsg msg = TbMsg.newMsg().type(msgType).originator(originatorId).calculatedFieldIds(calculatedFieldIds).metaData(md).data(JacksonUtil.writeValueAsString(payload)).build(); clusterService.pushMsgToRuleEngine(tenantId, originatorId, msg, null); } catch (Exception e) { log.warn("[{}] Failed to push message to rule engine. CalculatedFieldResult: {}", originatorId, calculatedFieldResult, e); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 2ddfd22db3..e35d3cead6 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -152,7 +152,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer if (request.isSaveLatest() && !request.isOnlyLatest()) { addEntityViewCallback(tenantId, entityId, request.getEntries()); } - calculatedFieldExecutionService.onTelemetryUpdate(tenantId, entityId, request.getEntries()); + calculatedFieldExecutionService.onTelemetryUpdate(tenantId, entityId, request.getCalculatedFieldIds(), request.getEntries()); return saveFuture; } @@ -168,7 +168,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer ListenableFuture> saveFuture = attrService.save(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries()); addMainCallback(saveFuture, request.getCallback()); addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice())); - calculatedFieldExecutionService.onTelemetryUpdate(request.getTenantId(), request.getEntityId(), request.getEntries()); + calculatedFieldExecutionService.onTelemetryUpdate(request.getTenantId(), request.getEntityId(), request.getCalculatedFieldIds(), request.getEntries()); } @Override diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 64e05770fe..993fa24ff3 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -34,6 +34,8 @@ import org.thingsboard.server.common.msg.gen.MsgProtos; import org.thingsboard.server.common.msg.queue.TbMsgCallback; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.UUID; @@ -64,6 +66,8 @@ public final class TbMsg implements Serializable { private final UUID correlationId; private final Integer partition; + private final List calculatedFieldIds; + @Getter(value = AccessLevel.NONE) @JsonIgnore //This field is not serialized because we use queues and there is no need to do it @@ -112,7 +116,7 @@ public final class TbMsg implements Serializable { } private TbMsg(String queueName, UUID id, long ts, TbMsgType internalType, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data, - RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID correlationId, Integer partition, TbMsgProcessingCtx ctx, TbMsgCallback callback) { + RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID correlationId, Integer partition, List calculatedFieldIds, TbMsgProcessingCtx ctx, TbMsgCallback callback) { this.id = id != null ? id : UUID.randomUUID(); this.queueName = queueName; if (ts > 0) { @@ -139,6 +143,7 @@ public final class TbMsg implements Serializable { this.ruleNodeId = ruleNodeId; this.correlationId = correlationId; this.partition = partition; + this.calculatedFieldIds = calculatedFieldIds; this.ctx = ctx != null ? ctx : new TbMsgProcessingCtx(); this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY); } @@ -200,6 +205,7 @@ public final class TbMsg implements Serializable { RuleNodeId ruleNodeId = null; UUID correlationId = null; Integer partition = null; + List calculatedFieldIds = new ArrayList<>(); if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) { customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB())); } @@ -214,6 +220,14 @@ public final class TbMsg implements Serializable { partition = proto.getPartition(); } + for (MsgProtos.CalculatedFieldIdProto cfIdProto : proto.getCalculatedFieldsList()) { + CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID( + cfIdProto.getCalculatedFieldIdMSB(), + cfIdProto.getCalculatedFieldIdLSB() + )); + calculatedFieldIds.add(calculatedFieldId); + } + TbMsgProcessingCtx ctx; if (proto.hasCtx()) { ctx = TbMsgProcessingCtx.fromProto(proto.getCtx()); @@ -224,7 +238,7 @@ public final class TbMsg implements Serializable { TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId, - metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, ctx, callback); + metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Could not parse protobuf for TbMsg", e); } @@ -343,10 +357,12 @@ public final class TbMsg implements Serializable { protected RuleNodeId ruleNodeId; protected UUID correlationId; protected Integer partition; + protected List calculatedFieldIds; protected TbMsgProcessingCtx ctx; protected TbMsgCallback callback; - TbMsgBuilder() {} + TbMsgBuilder() { + } TbMsgBuilder(TbMsg tbMsg) { this.queueName = tbMsg.queueName; @@ -363,6 +379,7 @@ public final class TbMsg implements Serializable { this.ruleNodeId = tbMsg.ruleNodeId; this.correlationId = tbMsg.correlationId; this.partition = tbMsg.partition; + this.calculatedFieldIds = tbMsg.calculatedFieldIds; this.ctx = tbMsg.ctx; this.callback = tbMsg.callback; } @@ -454,6 +471,11 @@ public final class TbMsg implements Serializable { return this; } + public TbMsgBuilder calculatedFieldIds(List calculatedFieldIds) { + this.calculatedFieldIds = calculatedFieldIds; + return this; + } + public TbMsgBuilder ctx(TbMsgProcessingCtx ctx) { this.ctx = ctx; return this; @@ -465,7 +487,7 @@ public final class TbMsg implements Serializable { } public TbMsg build() { - return new TbMsg(queueName, id, ts, internalType, type, originator, customerId, metaData, dataType, data, ruleChainId, ruleNodeId, correlationId, partition, ctx, callback); + return new TbMsg(queueName, id, ts, internalType, type, originator, customerId, metaData, dataType, data, ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback); } public String toString() { diff --git a/common/message/src/main/proto/tbmsg.proto b/common/message/src/main/proto/tbmsg.proto index fc9265aa14..36ab09f8d4 100644 --- a/common/message/src/main/proto/tbmsg.proto +++ b/common/message/src/main/proto/tbmsg.proto @@ -70,4 +70,11 @@ message TbMsgProto { int64 correlationIdMSB = 20; int64 correlationIdLSB = 21; int32 partition = 22; + + repeated CalculatedFieldIdProto calculatedFields = 23; +} + +message CalculatedFieldIdProto { + int64 calculatedFieldIdMSB = 1; + int64 calculatedFieldIdLSB = 2; } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java index 22fa8de6de..9747a6033e 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java @@ -22,6 +22,7 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.ToString; import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; @@ -40,6 +41,7 @@ public class AttributesSaveRequest { private final AttributeScope scope; private final List entries; private final boolean notifyDevice; + private final List calculatedFieldIds; private final FutureCallback callback; public static Builder builder() { @@ -53,6 +55,7 @@ public class AttributesSaveRequest { private AttributeScope scope; private List entries; private boolean notifyDevice = true; + private List calculatedFieldIds; private FutureCallback callback; Builder() {} @@ -100,6 +103,11 @@ public class AttributesSaveRequest { return this; } + public Builder calculatedFieldIds(List calculatedFieldIds) { + this.calculatedFieldIds = calculatedFieldIds; + return this; + } + public Builder callback(FutureCallback callback) { this.callback = callback; return this; @@ -120,7 +128,7 @@ public class AttributesSaveRequest { } public AttributesSaveRequest build() { - return new AttributesSaveRequest(tenantId, entityId, scope, entries, notifyDevice, callback); + return new AttributesSaveRequest(tenantId, entityId, scope, entries, notifyDevice, calculatedFieldIds, callback); } } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java index 2b5881212d..12afa2d939 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.SettableFuture; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; +import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -40,6 +41,7 @@ public class TimeseriesSaveRequest { private final long ttl; private final boolean saveLatest; private final boolean onlyLatest; + private final List calculatedFieldIds; private final FutureCallback callback; public static Builder builder() { @@ -56,6 +58,7 @@ public class TimeseriesSaveRequest { private FutureCallback callback; private boolean saveLatest = true; private boolean onlyLatest; + private List calculatedFieldIds; Builder() {} @@ -103,6 +106,11 @@ public class TimeseriesSaveRequest { return this; } + public Builder calculatedFieldIds(List calculatedFieldIds) { + this.calculatedFieldIds = calculatedFieldIds; + return this; + } + public Builder callback(FutureCallback callback) { this.callback = callback; return this; @@ -123,7 +131,7 @@ public class TimeseriesSaveRequest { } public TimeseriesSaveRequest build() { - return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, callback); + return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, calculatedFieldIds, callback); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 27f45feb47..386e56320a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -112,6 +112,7 @@ public class TbMsgTimeseriesNode implements TbNode { .entries(tsKvEntryList) .ttl(ttl) .saveLatest(!config.isSkipLatestPersistence()) + .calculatedFieldIds(msg.getCalculatedFieldIds()) .callback(new TelemetryNodeCallback(ctx, msg)) .build()); }