From 62e6c0948ca84eb177b9aae952b04a1e9e64346c Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 6 Mar 2025 16:59:12 +0200 Subject: [PATCH] removed handling for attr/ts deletion in the rule node --- .../server/actors/ActorSystemContext.java | 2 +- .../actors/ruleChain/DefaultTbContext.java | 4 +- .../cf}/CalculatedFieldQueueService.java | 13 +-- .../DefaultCalculatedFieldQueueService.java | 1 - .../DefaultTelemetrySubscriptionService.java | 2 +- ...faultTelemetrySubscriptionServiceTest.java | 2 +- ...RuleEngineCalculatedFieldQueueService.java | 26 +++++ .../rule/engine/api/TbContext.java | 2 +- .../telemetry/TbCalculatedFieldsNode.java | 96 ++----------------- 9 files changed, 47 insertions(+), 101 deletions(-) rename {rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api => application/src/main/java/org/thingsboard/server/service/cf}/CalculatedFieldQueueService.java (75%) create mode 100644 rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineCalculatedFieldQueueService.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 442ff1d11f..effca0e126 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -31,7 +31,6 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; @@ -110,6 +109,7 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; +import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index a5b43d3223..2943c44827 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -23,12 +23,12 @@ import org.bouncycastle.util.Arrays; import org.thingsboard.common.util.DebugModeUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService; import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache; +import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService; import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache; import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; import org.thingsboard.rule.engine.api.RuleEngineRpcService; @@ -904,7 +904,7 @@ public class DefaultTbContext implements TbContext { } @Override - public CalculatedFieldQueueService getCalculatedFieldQueueService() { + public RuleEngineCalculatedFieldQueueService getCalculatedFieldQueueService() { return mainCtx.getCalculatedFieldQueueService(); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/CalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java similarity index 75% rename from rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/CalculatedFieldQueueService.java rename to application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java index b4eb24baea..eb86220361 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/CalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java @@ -13,14 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.rule.engine.api; +package org.thingsboard.server.service.cf; import com.google.common.util.concurrent.FutureCallback; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; +import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService; +import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import java.util.List; -public interface CalculatedFieldQueueService { +public interface CalculatedFieldQueueService extends RuleEngineCalculatedFieldQueueService { /** * Filter CFs based on the request entity. Push to the queue if any matching CF exist; @@ -30,12 +35,8 @@ public interface CalculatedFieldQueueService { */ void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback); - void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback callback); - void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); - void pushRequestToQueue(AttributesSaveRequest request, FutureCallback callback); - void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback); void pushRequestToQueue(TimeseriesDeleteRequest request, List result, FutureCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index fc92b70e07..ba4be6ace6 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; -import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 31ae9e8055..2bdbd917ab 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -31,7 +31,6 @@ import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; -import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; @@ -51,6 +50,7 @@ import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.util.KvUtils; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; +import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index b47bea18d8..b8bfa13e50 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -28,7 +28,6 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; -import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; @@ -57,6 +56,7 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; +import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.subscription.SubscriptionManagerService; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineCalculatedFieldQueueService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineCalculatedFieldQueueService.java new file mode 100644 index 0000000000..6ab40b79c2 --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineCalculatedFieldQueueService.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +import com.google.common.util.concurrent.FutureCallback; + +public interface RuleEngineCalculatedFieldQueueService { + + void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback callback); + + void pushRequestToQueue(AttributesSaveRequest request, FutureCallback callback); + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 1abad50a8d..9053d96dfe 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -360,7 +360,7 @@ public interface TbContext { CalculatedFieldService getCalculatedFieldService(); - CalculatedFieldQueueService getCalculatedFieldQueueService(); + RuleEngineCalculatedFieldQueueService getCalculatedFieldQueueService(); boolean isExternalNodeForceAck(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java index 1f2c7f135d..b2cce52c87 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java @@ -15,13 +15,8 @@ */ package org.thingsboard.rule.engine.telemetry; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.util.concurrent.FutureCallback; import com.google.gson.JsonParser; -import jakarta.annotation.Nullable; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; import org.thingsboard.rule.engine.api.RuleNode; @@ -29,7 +24,6 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.adaptor.JsonConverter; @@ -42,23 +36,23 @@ import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import static org.thingsboard.server.common.data.DataConstants.SCOPE; @Slf4j @RuleNode( type = ComponentType.ACTION, - name = "apply to calculated fields", + name = "calculated fields", configClazz = EmptyNodeConfiguration.class, - nodeDescription = "Processes incoming messages for calculated fields", - nodeDetails = "This node processes incoming messages to update telemetry or attributes for predefined calculated fields without storing the original telemetry or attributes in the database. " + - "It ensures that calculated fields receive and process the necessary data without persisting the incoming values.", + nodeDescription = "Pushes incoming messages to calculated fields service", + nodeDetails = "Node enables the processing of calculated fields without persisting incoming messages to the database. " + + "By default, the processing of calculated fields is triggered by the save attributes and save time series nodes. " + + "This rule node accepts the same messages as these nodes but allows you to trigger the processing of calculated " + + "fields independently, ensuring that derived data can be computed and utilized in real time without storing the original message in the database.", configDirective = "tbNodeEmptyConfig", - icon = "call_made" + icon = "published_with_changes" ) public class TbCalculatedFieldsNode implements TbNode { @@ -74,8 +68,6 @@ public class TbCalculatedFieldsNode implements TbNode { switch (msg.getInternalType()) { case POST_TELEMETRY_REQUEST -> processPostTelemetryRequest(ctx, msg); case POST_ATTRIBUTES_REQUEST -> processPostAttributesRequest(ctx, msg); - case TIMESERIES_DELETED -> processTimeSeriesDeleted(ctx, msg); - case ATTRIBUTES_DELETED -> processAttributesDeleted(ctx, msg); default -> ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); } } @@ -84,7 +76,7 @@ public class TbCalculatedFieldsNode implements TbNode { Map> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(msg.getData()), System.currentTimeMillis()); if (tsKvMap.isEmpty()) { - ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + msg.getData())); + ctx.tellSuccess(msg); return; } @@ -130,76 +122,4 @@ public class TbCalculatedFieldsNode implements TbNode { ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback()); } - private void processTimeSeriesDeleted(TbContext ctx, TbMsg msg) { - List keysToDelete = Optional.ofNullable( - JacksonUtil.convertValue(JacksonUtil.toJsonNode(msg.getData()).get("timeseries"), new TypeReference>() { - }) - ).orElse(Collections.emptyList()); - - if (keysToDelete.isEmpty()) { - ctx.tellSuccess(msg); - return; - } - - TimeseriesDeleteRequest timeseriesDeleteRequest = TimeseriesDeleteRequest.builder() - .tenantId(ctx.getTenantId()) - .entityId(msg.getOriginator()) - .keys(keysToDelete) - .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) - .tbMsgId(msg.getId()) - .tbMsgType(msg.getInternalType()) - .callback(new FutureCallback>() { - @Override - public void onSuccess(@Nullable List tmp) { - ctx.tellSuccess(msg); - } - - @Override - public void onFailure(Throwable t) { - ctx.tellFailure(msg, t); - } - }) - .build(); - - ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesDeleteRequest, keysToDelete, getCalculatedFieldCallback(timeseriesDeleteRequest.getCallback(), keysToDelete)); - } - - private void processAttributesDeleted(TbContext ctx, TbMsg msg) { - List keysToDelete = Optional.ofNullable( - JacksonUtil.convertValue(JacksonUtil.toJsonNode(msg.getData()).get("attributes"), new TypeReference>() { - }) - ).orElse(Collections.emptyList()); - - if (keysToDelete.isEmpty()) { - ctx.tellSuccess(msg); - return; - } - - AttributesDeleteRequest attributesDeleteRequest = AttributesDeleteRequest.builder() - .tenantId(ctx.getTenantId()) - .entityId(msg.getOriginator()) - .scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE))) - .keys(keysToDelete) - .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) - .tbMsgId(msg.getId()) - .tbMsgType(msg.getInternalType()) - .callback(new TelemetryNodeCallback(ctx, msg)) - .build(); - ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesDeleteRequest, keysToDelete, attributesDeleteRequest.getCallback()); - } - - private FutureCallback getCalculatedFieldCallback(FutureCallback> originalCallback, List keys) { - return new FutureCallback() { - @Override - public void onSuccess(Void unused) { - originalCallback.onSuccess(keys); - } - - @Override - public void onFailure(Throwable t) { - originalCallback.onFailure(t); - } - }; - } - }