From 8e5cfb776dae4cfc1ffccf36c02c09a70d93d328 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 5 Mar 2025 17:12:26 +0200 Subject: [PATCH 1/3] added calculated fields rule node --- .../server/actors/ActorSystemContext.java | 6 + .../actors/ruleChain/DefaultTbContext.java | 6 + .../DefaultCalculatedFieldQueueService.java | 54 ++++- .../ctx/state/SingleValueArgumentEntry.java | 8 +- .../DefaultTelemetrySubscriptionService.java | 2 +- ...faultTelemetrySubscriptionServiceTest.java | 2 +- .../api}/CalculatedFieldQueueService.java | 10 +- .../rule/engine/api/TbContext.java | 2 + .../telemetry/TbCalculatedFieldsNode.java | 212 ++++++++++++++++++ 9 files changed, 282 insertions(+), 20 deletions(-) rename {application/src/main/java/org/thingsboard/server/service/cf => rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api}/CalculatedFieldQueueService.java (83%) create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index de3864df9f..442ff1d11f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -31,6 +31,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; @@ -544,6 +545,11 @@ public class ActorSystemContext { @Getter private CalculatedFieldStateService calculatedFieldStateService; + @Lazy + @Autowired(required = false) + @Getter + private CalculatedFieldQueueService calculatedFieldQueueService; + @Lazy @Autowired(required = false) @Getter diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index d162b6a09a..a5b43d3223 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -23,6 +23,7 @@ import org.bouncycastle.util.Arrays; import org.thingsboard.common.util.DebugModeUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; @@ -902,6 +903,11 @@ public class DefaultTbContext implements TbContext { return mainCtx.getCalculatedFieldService(); } + @Override + public CalculatedFieldQueueService getCalculatedFieldQueueService() { + return mainCtx.getCalculatedFieldQueueService(); + } + @Override public boolean isExternalNodeForceAck() { return mainCtx.isExternalNodeForceAck(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index 3ca0b220ff..c1ffb10e79 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; @@ -47,6 +48,7 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Set; @@ -89,6 +91,14 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } + @Override + public void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback callback) { + var tenantId = request.getTenantId(); + var entityId = request.getEntityId(); + checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()), + () -> toCalculatedFieldTelemetryMsgProto(request), callback); + } + @Override public void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback) { var tenantId = request.getTenantId(); @@ -97,6 +107,14 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } + @Override + public void pushRequestToQueue(AttributesSaveRequest request, FutureCallback callback) { + var tenantId = request.getTenantId(); + var entityId = request.getEntityId(); + checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()), + () -> toCalculatedFieldTelemetryMsgProto(request), callback); + } + @Override public void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback) { var tenantId = request.getTenantId(); @@ -152,22 +170,34 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS }; } + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request) { + return toCalculatedFieldTelemetryMsgProto(request, null); + } + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); - List entries = request.getEntries(); - List versions = result.getVersions(); - for (int i = 0; i < entries.size(); i++) { - long tsVersion = versions.get(i); - TsKvProto tsProto = toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build(); - telemetryMsg.addTsData(tsProto); - } - msg.setTelemetryMsg(telemetryMsg.build()); + List entries = request.getEntries(); + List versions = result != null ? result.getVersions() : Collections.emptyList(); + + for (int i = 0; i < entries.size(); i++) { + TsKvProto.Builder tsProtoBuilder = toTsKvProto(entries.get(i)).toBuilder(); + if (result != null) { + tsProtoBuilder.setVersion(versions.get(i)); + } + telemetryMsg.addTsData(tsProtoBuilder.build()); + } + + msg.setTelemetryMsg(telemetryMsg.build()); return msg.build(); } + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request) { + return toCalculatedFieldTelemetryMsgProto(request, null); + } + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List versions) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); @@ -175,9 +205,11 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name())); List entries = request.getEntries(); for (int i = 0; i < entries.size(); i++) { - long attrVersion = versions.get(i); - AttributeValueProto attrProto = ProtoUtils.toProto(entries.get(i)).toBuilder().setVersion(attrVersion).build(); - telemetryMsg.addAttrData(attrProto); + AttributeValueProto.Builder attrProtoBuilder = ProtoUtils.toProto(entries.get(i)).toBuilder(); + if (versions != null) { + attrProtoBuilder.setVersion(versions.get(i)); + } + telemetryMsg.addAttrData(attrProtoBuilder.build()); } msg.setTelemetryMsg(telemetryMsg.build()); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java index a237f3d022..bfe9eed24f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java @@ -42,13 +42,17 @@ public class SingleValueArgumentEntry implements ArgumentEntry { public SingleValueArgumentEntry(TsKvProto entry) { this.ts = entry.getTs(); - this.version = entry.getVersion(); + if (entry.hasVersion()) { + this.version = entry.getVersion(); + } this.kvEntryValue = ProtoUtils.fromProto(entry.getKv()); } public SingleValueArgumentEntry(AttributeValueProto entry) { this.ts = entry.getLastUpdateTs(); - this.version = entry.getVersion(); + if (entry.hasVersion()) { + this.version = entry.getVersion(); + } this.kvEntryValue = ProtoUtils.basicKvEntryFromProto(entry); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 2bdbd917ab..31ae9e8055 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -31,6 +31,7 @@ import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; @@ -50,7 +51,6 @@ import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.util.KvUtils; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; -import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index b8bfa13e50..b47bea18d8 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; @@ -56,7 +57,6 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; -import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.subscription.SubscriptionManagerService; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/CalculatedFieldQueueService.java similarity index 83% rename from application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java rename to rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/CalculatedFieldQueueService.java index b688993a8d..b4eb24baea 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/CalculatedFieldQueueService.java @@ -13,13 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.cf; +package org.thingsboard.rule.engine.api; import com.google.common.util.concurrent.FutureCallback; -import org.thingsboard.rule.engine.api.AttributesDeleteRequest; -import org.thingsboard.rule.engine.api.AttributesSaveRequest; -import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; -import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import java.util.List; @@ -34,8 +30,12 @@ public interface CalculatedFieldQueueService { */ void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback); + void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback callback); + void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); + void pushRequestToQueue(AttributesSaveRequest request, FutureCallback callback); + void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback); void pushRequestToQueue(TimeseriesDeleteRequest request, List result, FutureCallback callback); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 6ba4eae0e4..1abad50a8d 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -360,6 +360,8 @@ public interface TbContext { CalculatedFieldService getCalculatedFieldService(); + CalculatedFieldQueueService getCalculatedFieldQueueService(); + boolean isExternalNodeForceAck(); /** diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java new file mode 100644 index 0000000000..ac0e61d9be --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java @@ -0,0 +1,212 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.FutureCallback; +import com.google.gson.JsonParser; +import jakarta.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; +import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.adaptor.JsonConverter; +import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.thingsboard.server.common.data.DataConstants.SCOPE; +import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED; +import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST; +import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST; +import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_DELETED; + +@Slf4j +@RuleNode( + type = ComponentType.ACTION, + name = "push to calculated fields", + configClazz = EmptyNodeConfiguration.class, + nodeDescription = "Pushes messages to the calculated fields for further processing", + nodeDetails = "Forwards incoming messages to the calculated fields, where they will be processed to compute values based on predefined calculation rules without persisting any data in the database.", + configDirective = "tbNodeEmptyConfig", + icon = "send" +) +public class TbCalculatedFieldsNode implements TbNode { + + private EmptyNodeConfiguration config; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + if (!msg.isTypeOneOf(POST_ATTRIBUTES_REQUEST, POST_TELEMETRY_REQUEST, ATTRIBUTES_DELETED, TIMESERIES_DELETED)) { + ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); + return; + } + + String src = msg.getData(); + + if (msg.isTypeOf(POST_TELEMETRY_REQUEST)) { + processPostTelemetryRequest(ctx, msg, src); + } else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) { + processPostAttributesRequest(ctx, msg, src); + } else if (msg.isTypeOf(TIMESERIES_DELETED)) { + processTimeSeriesDeleted(ctx, msg, src); + } else { + processAttributesDeleted(ctx, msg, src); + } + + } + + private void processPostTelemetryRequest(TbContext ctx, TbMsg msg, String src) { + Map> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(src), System.currentTimeMillis()); + if (tsKvMap.isEmpty()) { + ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + src)); + return; + } + + List tsKvEntryList = new ArrayList<>(); + for (Map.Entry> tsKvEntry : tsKvMap.entrySet()) { + for (KvEntry kvEntry : tsKvEntry.getValue()) { + tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry)); + } + } + + TimeseriesSaveRequest timeseriesSaveRequest = TimeseriesSaveRequest.builder() + .tenantId(ctx.getTenantId()) + .customerId(msg.getCustomerId()) + .entityId(msg.getOriginator()) + .entries(tsKvEntryList) + .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) + .tbMsgId(msg.getId()) + .tbMsgType(msg.getInternalType()) + .callback(new TelemetryNodeCallback(ctx, msg)) + .build(); + + ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesSaveRequest, timeseriesSaveRequest.getCallback()); + } + + private void processPostAttributesRequest(TbContext ctx, TbMsg msg, String src) { + List newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(src))); + + if (newAttributes.isEmpty()) { + ctx.tellSuccess(msg); + return; + } + + AttributesSaveRequest attributesSaveRequest = AttributesSaveRequest.builder() + .tenantId(ctx.getTenantId()) + .entityId(msg.getOriginator()) + .scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE))) + .entries(newAttributes) + .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) + .tbMsgId(msg.getId()) + .tbMsgType(msg.getInternalType()) + .callback(new TelemetryNodeCallback(ctx, msg)) + .build(); + ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback()); + } + + private void processTimeSeriesDeleted(TbContext ctx, TbMsg msg, String src) { + JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData()); + List keysToDelete = JacksonUtil.convertValue(dataJson.get("timeSeries"), new TypeReference<>() { + }); + + if (keysToDelete.isEmpty()) { + ctx.tellSuccess(msg); + return; + } + + TimeseriesDeleteRequest timeseriesDeleteRequest = TimeseriesDeleteRequest.builder() + .tenantId(ctx.getTenantId()) + .entityId(msg.getOriginator()) + .keys(keysToDelete) + .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) + .tbMsgId(msg.getId()) + .tbMsgType(msg.getInternalType()) + .callback(new FutureCallback>() { + @Override + public void onSuccess(@Nullable List tmp) { + } + + @Override + public void onFailure(Throwable t) { + } + }) + .build(); + + ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesDeleteRequest, keysToDelete, getCalculatedFieldCallback(timeseriesDeleteRequest.getCallback(), keysToDelete)); + } + + private void processAttributesDeleted(TbContext ctx, TbMsg msg, String src) { + JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData()); + List keysToDelete = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { + }); + + if (keysToDelete.isEmpty()) { + ctx.tellSuccess(msg); + return; + } + + AttributesDeleteRequest attributesDeleteRequest = AttributesDeleteRequest.builder() + .tenantId(ctx.getTenantId()) + .entityId(msg.getOriginator()) + .scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE))) + .keys(keysToDelete) + .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) + .tbMsgId(msg.getId()) + .tbMsgType(msg.getInternalType()) + .callback(new TelemetryNodeCallback(ctx, msg)) + .build(); + ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesDeleteRequest, keysToDelete, attributesDeleteRequest.getCallback()); + } + + private FutureCallback getCalculatedFieldCallback(FutureCallback> originalCallback, List keys) { + return new FutureCallback() { + @Override + public void onSuccess(Void unused) { + originalCallback.onSuccess(keys); + } + + @Override + public void onFailure(Throwable t) { + originalCallback.onFailure(t); + } + }; + } + +} From b6b440fa2e374a3012e889db40651b2dee3f71d0 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 6 Mar 2025 10:12:29 +0200 Subject: [PATCH 2/3] updated desscription --- .../DefaultCalculatedFieldQueueService.java | 18 +---- .../telemetry/TbCalculatedFieldsNode.java | 69 +++++++++---------- 2 files changed, 33 insertions(+), 54 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index c1ffb10e79..fc92b70e07 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -93,10 +93,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS @Override public void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback callback) { - var tenantId = request.getTenantId(); - var entityId = request.getEntityId(); - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()), - () -> toCalculatedFieldTelemetryMsgProto(request), callback); + pushRequestToQueue(request, null, callback); } @Override @@ -109,10 +106,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS @Override public void pushRequestToQueue(AttributesSaveRequest request, FutureCallback callback) { - var tenantId = request.getTenantId(); - var entityId = request.getEntityId(); - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()), - () -> toCalculatedFieldTelemetryMsgProto(request), callback); + pushRequestToQueue(request, null, callback); } @Override @@ -170,10 +164,6 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS }; } - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request) { - return toCalculatedFieldTelemetryMsgProto(request, null); - } - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); @@ -194,10 +184,6 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS return msg.build(); } - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request) { - return toCalculatedFieldTelemetryMsgProto(request, null); - } - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List versions) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java index ac0e61d9be..1f2c7f135d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java @@ -16,7 +16,6 @@ package org.thingsboard.rule.engine.telemetry; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.FutureCallback; import com.google.gson.JsonParser; import jakarta.annotation.Nullable; @@ -43,24 +42,23 @@ import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.thingsboard.server.common.data.DataConstants.SCOPE; -import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED; -import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST; -import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST; -import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_DELETED; @Slf4j @RuleNode( type = ComponentType.ACTION, - name = "push to calculated fields", + name = "apply to calculated fields", configClazz = EmptyNodeConfiguration.class, - nodeDescription = "Pushes messages to the calculated fields for further processing", - nodeDetails = "Forwards incoming messages to the calculated fields, where they will be processed to compute values based on predefined calculation rules without persisting any data in the database.", + nodeDescription = "Processes incoming messages for calculated fields", + nodeDetails = "This node processes incoming messages to update telemetry or attributes for predefined calculated fields without storing the original telemetry or attributes in the database. " + + "It ensures that calculated fields receive and process the necessary data without persisting the incoming values.", configDirective = "tbNodeEmptyConfig", - icon = "send" + icon = "call_made" ) public class TbCalculatedFieldsNode implements TbNode { @@ -73,29 +71,20 @@ public class TbCalculatedFieldsNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - if (!msg.isTypeOneOf(POST_ATTRIBUTES_REQUEST, POST_TELEMETRY_REQUEST, ATTRIBUTES_DELETED, TIMESERIES_DELETED)) { - ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); - return; + switch (msg.getInternalType()) { + case POST_TELEMETRY_REQUEST -> processPostTelemetryRequest(ctx, msg); + case POST_ATTRIBUTES_REQUEST -> processPostAttributesRequest(ctx, msg); + case TIMESERIES_DELETED -> processTimeSeriesDeleted(ctx, msg); + case ATTRIBUTES_DELETED -> processAttributesDeleted(ctx, msg); + default -> ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); } - - String src = msg.getData(); - - if (msg.isTypeOf(POST_TELEMETRY_REQUEST)) { - processPostTelemetryRequest(ctx, msg, src); - } else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) { - processPostAttributesRequest(ctx, msg, src); - } else if (msg.isTypeOf(TIMESERIES_DELETED)) { - processTimeSeriesDeleted(ctx, msg, src); - } else { - processAttributesDeleted(ctx, msg, src); - } - } - private void processPostTelemetryRequest(TbContext ctx, TbMsg msg, String src) { - Map> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(src), System.currentTimeMillis()); + private void processPostTelemetryRequest(TbContext ctx, TbMsg msg) { + Map> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(msg.getData()), System.currentTimeMillis()); + if (tsKvMap.isEmpty()) { - ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + src)); + ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + msg.getData())); return; } @@ -120,8 +109,8 @@ public class TbCalculatedFieldsNode implements TbNode { ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesSaveRequest, timeseriesSaveRequest.getCallback()); } - private void processPostAttributesRequest(TbContext ctx, TbMsg msg, String src) { - List newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(src))); + private void processPostAttributesRequest(TbContext ctx, TbMsg msg) { + List newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(msg.getData()))); if (newAttributes.isEmpty()) { ctx.tellSuccess(msg); @@ -141,10 +130,11 @@ public class TbCalculatedFieldsNode implements TbNode { ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback()); } - private void processTimeSeriesDeleted(TbContext ctx, TbMsg msg, String src) { - JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData()); - List keysToDelete = JacksonUtil.convertValue(dataJson.get("timeSeries"), new TypeReference<>() { - }); + private void processTimeSeriesDeleted(TbContext ctx, TbMsg msg) { + List keysToDelete = Optional.ofNullable( + JacksonUtil.convertValue(JacksonUtil.toJsonNode(msg.getData()).get("timeseries"), new TypeReference>() { + }) + ).orElse(Collections.emptyList()); if (keysToDelete.isEmpty()) { ctx.tellSuccess(msg); @@ -161,10 +151,12 @@ public class TbCalculatedFieldsNode implements TbNode { .callback(new FutureCallback>() { @Override public void onSuccess(@Nullable List tmp) { + ctx.tellSuccess(msg); } @Override public void onFailure(Throwable t) { + ctx.tellFailure(msg, t); } }) .build(); @@ -172,10 +164,11 @@ public class TbCalculatedFieldsNode implements TbNode { ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesDeleteRequest, keysToDelete, getCalculatedFieldCallback(timeseriesDeleteRequest.getCallback(), keysToDelete)); } - private void processAttributesDeleted(TbContext ctx, TbMsg msg, String src) { - JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData()); - List keysToDelete = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { - }); + private void processAttributesDeleted(TbContext ctx, TbMsg msg) { + List keysToDelete = Optional.ofNullable( + JacksonUtil.convertValue(JacksonUtil.toJsonNode(msg.getData()).get("attributes"), new TypeReference>() { + }) + ).orElse(Collections.emptyList()); if (keysToDelete.isEmpty()) { ctx.tellSuccess(msg); From 62e6c0948ca84eb177b9aae952b04a1e9e64346c Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 6 Mar 2025 16:59:12 +0200 Subject: [PATCH 3/3] removed handling for attr/ts deletion in the rule node --- .../server/actors/ActorSystemContext.java | 2 +- .../actors/ruleChain/DefaultTbContext.java | 4 +- .../cf}/CalculatedFieldQueueService.java | 13 +-- .../DefaultCalculatedFieldQueueService.java | 1 - .../DefaultTelemetrySubscriptionService.java | 2 +- ...faultTelemetrySubscriptionServiceTest.java | 2 +- ...RuleEngineCalculatedFieldQueueService.java | 26 +++++ .../rule/engine/api/TbContext.java | 2 +- .../telemetry/TbCalculatedFieldsNode.java | 96 ++----------------- 9 files changed, 47 insertions(+), 101 deletions(-) rename {rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api => application/src/main/java/org/thingsboard/server/service/cf}/CalculatedFieldQueueService.java (75%) create mode 100644 rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineCalculatedFieldQueueService.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 442ff1d11f..effca0e126 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -31,7 +31,6 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; @@ -110,6 +109,7 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; +import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index a5b43d3223..2943c44827 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -23,12 +23,12 @@ import org.bouncycastle.util.Arrays; import org.thingsboard.common.util.DebugModeUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService; import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache; +import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService; import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache; import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; import org.thingsboard.rule.engine.api.RuleEngineRpcService; @@ -904,7 +904,7 @@ public class DefaultTbContext implements TbContext { } @Override - public CalculatedFieldQueueService getCalculatedFieldQueueService() { + public RuleEngineCalculatedFieldQueueService getCalculatedFieldQueueService() { return mainCtx.getCalculatedFieldQueueService(); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/CalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java similarity index 75% rename from rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/CalculatedFieldQueueService.java rename to application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java index b4eb24baea..eb86220361 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/CalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java @@ -13,14 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.rule.engine.api; +package org.thingsboard.server.service.cf; import com.google.common.util.concurrent.FutureCallback; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; +import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService; +import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import java.util.List; -public interface CalculatedFieldQueueService { +public interface CalculatedFieldQueueService extends RuleEngineCalculatedFieldQueueService { /** * Filter CFs based on the request entity. Push to the queue if any matching CF exist; @@ -30,12 +35,8 @@ public interface CalculatedFieldQueueService { */ void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback); - void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback callback); - void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); - void pushRequestToQueue(AttributesSaveRequest request, FutureCallback callback); - void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback); void pushRequestToQueue(TimeseriesDeleteRequest request, List result, FutureCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index fc92b70e07..ba4be6ace6 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; -import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 31ae9e8055..2bdbd917ab 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -31,7 +31,6 @@ import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; -import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; @@ -51,6 +50,7 @@ import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.util.KvUtils; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; +import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index b47bea18d8..b8bfa13e50 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -28,7 +28,6 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; -import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; @@ -57,6 +56,7 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; +import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.subscription.SubscriptionManagerService; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineCalculatedFieldQueueService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineCalculatedFieldQueueService.java new file mode 100644 index 0000000000..6ab40b79c2 --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineCalculatedFieldQueueService.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +import com.google.common.util.concurrent.FutureCallback; + +public interface RuleEngineCalculatedFieldQueueService { + + void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback callback); + + void pushRequestToQueue(AttributesSaveRequest request, FutureCallback callback); + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 1abad50a8d..9053d96dfe 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -360,7 +360,7 @@ public interface TbContext { CalculatedFieldService getCalculatedFieldService(); - CalculatedFieldQueueService getCalculatedFieldQueueService(); + RuleEngineCalculatedFieldQueueService getCalculatedFieldQueueService(); boolean isExternalNodeForceAck(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java index 1f2c7f135d..b2cce52c87 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java @@ -15,13 +15,8 @@ */ package org.thingsboard.rule.engine.telemetry; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.util.concurrent.FutureCallback; import com.google.gson.JsonParser; -import jakarta.annotation.Nullable; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; import org.thingsboard.rule.engine.api.RuleNode; @@ -29,7 +24,6 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.adaptor.JsonConverter; @@ -42,23 +36,23 @@ import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import static org.thingsboard.server.common.data.DataConstants.SCOPE; @Slf4j @RuleNode( type = ComponentType.ACTION, - name = "apply to calculated fields", + name = "calculated fields", configClazz = EmptyNodeConfiguration.class, - nodeDescription = "Processes incoming messages for calculated fields", - nodeDetails = "This node processes incoming messages to update telemetry or attributes for predefined calculated fields without storing the original telemetry or attributes in the database. " + - "It ensures that calculated fields receive and process the necessary data without persisting the incoming values.", + nodeDescription = "Pushes incoming messages to calculated fields service", + nodeDetails = "Node enables the processing of calculated fields without persisting incoming messages to the database. " + + "By default, the processing of calculated fields is triggered by the save attributes and save time series nodes. " + + "This rule node accepts the same messages as these nodes but allows you to trigger the processing of calculated " + + "fields independently, ensuring that derived data can be computed and utilized in real time without storing the original message in the database.", configDirective = "tbNodeEmptyConfig", - icon = "call_made" + icon = "published_with_changes" ) public class TbCalculatedFieldsNode implements TbNode { @@ -74,8 +68,6 @@ public class TbCalculatedFieldsNode implements TbNode { switch (msg.getInternalType()) { case POST_TELEMETRY_REQUEST -> processPostTelemetryRequest(ctx, msg); case POST_ATTRIBUTES_REQUEST -> processPostAttributesRequest(ctx, msg); - case TIMESERIES_DELETED -> processTimeSeriesDeleted(ctx, msg); - case ATTRIBUTES_DELETED -> processAttributesDeleted(ctx, msg); default -> ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); } } @@ -84,7 +76,7 @@ public class TbCalculatedFieldsNode implements TbNode { Map> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(msg.getData()), System.currentTimeMillis()); if (tsKvMap.isEmpty()) { - ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + msg.getData())); + ctx.tellSuccess(msg); return; } @@ -130,76 +122,4 @@ public class TbCalculatedFieldsNode implements TbNode { ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback()); } - private void processTimeSeriesDeleted(TbContext ctx, TbMsg msg) { - List keysToDelete = Optional.ofNullable( - JacksonUtil.convertValue(JacksonUtil.toJsonNode(msg.getData()).get("timeseries"), new TypeReference>() { - }) - ).orElse(Collections.emptyList()); - - if (keysToDelete.isEmpty()) { - ctx.tellSuccess(msg); - return; - } - - TimeseriesDeleteRequest timeseriesDeleteRequest = TimeseriesDeleteRequest.builder() - .tenantId(ctx.getTenantId()) - .entityId(msg.getOriginator()) - .keys(keysToDelete) - .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) - .tbMsgId(msg.getId()) - .tbMsgType(msg.getInternalType()) - .callback(new FutureCallback>() { - @Override - public void onSuccess(@Nullable List tmp) { - ctx.tellSuccess(msg); - } - - @Override - public void onFailure(Throwable t) { - ctx.tellFailure(msg, t); - } - }) - .build(); - - ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesDeleteRequest, keysToDelete, getCalculatedFieldCallback(timeseriesDeleteRequest.getCallback(), keysToDelete)); - } - - private void processAttributesDeleted(TbContext ctx, TbMsg msg) { - List keysToDelete = Optional.ofNullable( - JacksonUtil.convertValue(JacksonUtil.toJsonNode(msg.getData()).get("attributes"), new TypeReference>() { - }) - ).orElse(Collections.emptyList()); - - if (keysToDelete.isEmpty()) { - ctx.tellSuccess(msg); - return; - } - - AttributesDeleteRequest attributesDeleteRequest = AttributesDeleteRequest.builder() - .tenantId(ctx.getTenantId()) - .entityId(msg.getOriginator()) - .scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE))) - .keys(keysToDelete) - .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) - .tbMsgId(msg.getId()) - .tbMsgType(msg.getInternalType()) - .callback(new TelemetryNodeCallback(ctx, msg)) - .build(); - ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesDeleteRequest, keysToDelete, attributesDeleteRequest.getCallback()); - } - - private FutureCallback getCalculatedFieldCallback(FutureCallback> originalCallback, List keys) { - return new FutureCallback() { - @Override - public void onSuccess(Void unused) { - originalCallback.onSuccess(keys); - } - - @Override - public void onFailure(Throwable t) { - originalCallback.onFailure(t); - } - }; - } - }