removed handling for attr/ts deletion in the rule node
This commit is contained in:
		
							parent
							
								
									b6b440fa2e
								
							
						
					
					
						commit
						62e6c0948c
					
				@ -31,7 +31,6 @@ import org.springframework.data.redis.core.RedisTemplate;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.rule.engine.api.CalculatedFieldQueueService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.MailService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NotificationCenter;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
 | 
			
		||||
@ -110,6 +109,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldQueueService;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
 | 
			
		||||
import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache;
 | 
			
		||||
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
 | 
			
		||||
 | 
			
		||||
@ -23,12 +23,12 @@ import org.bouncycastle.util.Arrays;
 | 
			
		||||
import org.thingsboard.common.util.DebugModeUtil;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.common.util.ListeningExecutor;
 | 
			
		||||
import org.thingsboard.rule.engine.api.CalculatedFieldQueueService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.MailService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NotificationCenter;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
 | 
			
		||||
@ -904,7 +904,7 @@ public class DefaultTbContext implements TbContext {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public CalculatedFieldQueueService getCalculatedFieldQueueService() {
 | 
			
		||||
    public RuleEngineCalculatedFieldQueueService getCalculatedFieldQueueService() {
 | 
			
		||||
        return mainCtx.getCalculatedFieldQueueService();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -13,14 +13,19 @@
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.api;
 | 
			
		||||
package org.thingsboard.server.service.cf;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.FutureCallback;
 | 
			
		||||
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
 | 
			
		||||
public interface CalculatedFieldQueueService {
 | 
			
		||||
public interface CalculatedFieldQueueService extends RuleEngineCalculatedFieldQueueService {
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Filter CFs based on the request entity. Push to the queue if any matching CF exist;
 | 
			
		||||
@ -30,12 +35,8 @@ public interface CalculatedFieldQueueService {
 | 
			
		||||
     */
 | 
			
		||||
    void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
    void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
    void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
    void pushRequestToQueue(AttributesSaveRequest request, FutureCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
    void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
    void pushRequestToQueue(TimeseriesDeleteRequest request, List<String> result, FutureCallback<Void> callback);
 | 
			
		||||
@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.CalculatedFieldQueueService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
 | 
			
		||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
 | 
			
		||||
@ -31,7 +31,6 @@ import org.thingsboard.common.util.DonAsynchron;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.CalculatedFieldQueueService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
 | 
			
		||||
@ -51,6 +50,7 @@ import org.thingsboard.server.dao.attributes.AttributesService;
 | 
			
		||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
 | 
			
		||||
import org.thingsboard.server.dao.util.KvUtils;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldQueueService;
 | 
			
		||||
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
 | 
			
		||||
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -28,7 +28,6 @@ import org.junit.jupiter.params.provider.MethodSource;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.junit.jupiter.MockitoExtension;
 | 
			
		||||
import org.springframework.test.util.ReflectionTestUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.api.CalculatedFieldQueueService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
 | 
			
		||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
 | 
			
		||||
@ -57,6 +56,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.QueueKey;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldQueueService;
 | 
			
		||||
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
 | 
			
		||||
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,26 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2025 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.api;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.FutureCallback;
 | 
			
		||||
 | 
			
		||||
public interface RuleEngineCalculatedFieldQueueService {
 | 
			
		||||
 | 
			
		||||
    void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
    void pushRequestToQueue(AttributesSaveRequest request, FutureCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -360,7 +360,7 @@ public interface TbContext {
 | 
			
		||||
 | 
			
		||||
    CalculatedFieldService getCalculatedFieldService();
 | 
			
		||||
 | 
			
		||||
    CalculatedFieldQueueService getCalculatedFieldQueueService();
 | 
			
		||||
    RuleEngineCalculatedFieldQueueService getCalculatedFieldQueueService();
 | 
			
		||||
 | 
			
		||||
    boolean isExternalNodeForceAck();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -15,13 +15,8 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.telemetry;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.core.type.TypeReference;
 | 
			
		||||
import com.google.common.util.concurrent.FutureCallback;
 | 
			
		||||
import com.google.gson.JsonParser;
 | 
			
		||||
import jakarta.annotation.Nullable;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleNode;
 | 
			
		||||
@ -29,7 +24,6 @@ import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.server.common.adaptor.JsonConverter;
 | 
			
		||||
@ -42,23 +36,23 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RuleNode(
 | 
			
		||||
        type = ComponentType.ACTION,
 | 
			
		||||
        name = "apply to calculated fields",
 | 
			
		||||
        name = "calculated fields",
 | 
			
		||||
        configClazz = EmptyNodeConfiguration.class,
 | 
			
		||||
        nodeDescription = "Processes incoming messages for calculated fields",
 | 
			
		||||
        nodeDetails = "This node processes incoming messages to update telemetry or attributes for predefined calculated fields without storing the original telemetry or attributes in the database. " +
 | 
			
		||||
                "It ensures that calculated fields receive and process the necessary data without persisting the incoming values.",
 | 
			
		||||
        nodeDescription = "Pushes incoming messages to calculated fields service",
 | 
			
		||||
        nodeDetails = "Node enables the processing of calculated fields without persisting incoming messages to the database. " +
 | 
			
		||||
                "By default, the processing of calculated fields is triggered by the <b>save attributes</b> and <b>save time series</b> nodes. " +
 | 
			
		||||
                "This rule node accepts the same messages as these nodes but allows you to trigger the processing of calculated " +
 | 
			
		||||
                "fields independently, ensuring that derived data can be computed and utilized in real time without storing the original message in the database.",
 | 
			
		||||
        configDirective = "tbNodeEmptyConfig",
 | 
			
		||||
        icon = "call_made"
 | 
			
		||||
        icon = "published_with_changes"
 | 
			
		||||
)
 | 
			
		||||
public class TbCalculatedFieldsNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
@ -74,8 +68,6 @@ public class TbCalculatedFieldsNode implements TbNode {
 | 
			
		||||
        switch (msg.getInternalType()) {
 | 
			
		||||
            case POST_TELEMETRY_REQUEST -> processPostTelemetryRequest(ctx, msg);
 | 
			
		||||
            case POST_ATTRIBUTES_REQUEST -> processPostAttributesRequest(ctx, msg);
 | 
			
		||||
            case TIMESERIES_DELETED -> processTimeSeriesDeleted(ctx, msg);
 | 
			
		||||
            case ATTRIBUTES_DELETED -> processAttributesDeleted(ctx, msg);
 | 
			
		||||
            default -> ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -84,7 +76,7 @@ public class TbCalculatedFieldsNode implements TbNode {
 | 
			
		||||
        Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(msg.getData()), System.currentTimeMillis());
 | 
			
		||||
 | 
			
		||||
        if (tsKvMap.isEmpty()) {
 | 
			
		||||
            ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + msg.getData()));
 | 
			
		||||
            ctx.tellSuccess(msg);
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -130,76 +122,4 @@ public class TbCalculatedFieldsNode implements TbNode {
 | 
			
		||||
        ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void processTimeSeriesDeleted(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        List<String> keysToDelete = Optional.ofNullable(
 | 
			
		||||
                JacksonUtil.convertValue(JacksonUtil.toJsonNode(msg.getData()).get("timeseries"), new TypeReference<List<String>>() {
 | 
			
		||||
                })
 | 
			
		||||
        ).orElse(Collections.emptyList());
 | 
			
		||||
 | 
			
		||||
        if (keysToDelete.isEmpty()) {
 | 
			
		||||
            ctx.tellSuccess(msg);
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        TimeseriesDeleteRequest timeseriesDeleteRequest = TimeseriesDeleteRequest.builder()
 | 
			
		||||
                .tenantId(ctx.getTenantId())
 | 
			
		||||
                .entityId(msg.getOriginator())
 | 
			
		||||
                .keys(keysToDelete)
 | 
			
		||||
                .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
 | 
			
		||||
                .tbMsgId(msg.getId())
 | 
			
		||||
                .tbMsgType(msg.getInternalType())
 | 
			
		||||
                .callback(new FutureCallback<List<String>>() {
 | 
			
		||||
                    @Override
 | 
			
		||||
                    public void onSuccess(@Nullable List<String> tmp) {
 | 
			
		||||
                        ctx.tellSuccess(msg);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    @Override
 | 
			
		||||
                    public void onFailure(Throwable t) {
 | 
			
		||||
                        ctx.tellFailure(msg, t);
 | 
			
		||||
                    }
 | 
			
		||||
                })
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesDeleteRequest, keysToDelete, getCalculatedFieldCallback(timeseriesDeleteRequest.getCallback(), keysToDelete));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void processAttributesDeleted(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        List<String> keysToDelete = Optional.ofNullable(
 | 
			
		||||
                JacksonUtil.convertValue(JacksonUtil.toJsonNode(msg.getData()).get("attributes"), new TypeReference<List<String>>() {
 | 
			
		||||
                })
 | 
			
		||||
        ).orElse(Collections.emptyList());
 | 
			
		||||
 | 
			
		||||
        if (keysToDelete.isEmpty()) {
 | 
			
		||||
            ctx.tellSuccess(msg);
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        AttributesDeleteRequest attributesDeleteRequest = AttributesDeleteRequest.builder()
 | 
			
		||||
                .tenantId(ctx.getTenantId())
 | 
			
		||||
                .entityId(msg.getOriginator())
 | 
			
		||||
                .scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE)))
 | 
			
		||||
                .keys(keysToDelete)
 | 
			
		||||
                .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
 | 
			
		||||
                .tbMsgId(msg.getId())
 | 
			
		||||
                .tbMsgType(msg.getInternalType())
 | 
			
		||||
                .callback(new TelemetryNodeCallback(ctx, msg))
 | 
			
		||||
                .build();
 | 
			
		||||
        ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesDeleteRequest, keysToDelete, attributesDeleteRequest.getCallback());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private FutureCallback<Void> getCalculatedFieldCallback(FutureCallback<List<String>> originalCallback, List<String> keys) {
 | 
			
		||||
        return new FutureCallback<Void>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(Void unused) {
 | 
			
		||||
                originalCallback.onSuccess(keys);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(Throwable t) {
 | 
			
		||||
                originalCallback.onFailure(t);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user