added calculated fields rule node

This commit is contained in:
IrynaMatveieva 2025-03-05 17:12:26 +02:00
parent e6438c51f0
commit 8e5cfb776d
9 changed files with 282 additions and 20 deletions

View File

@ -31,6 +31,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.CalculatedFieldQueueService;
import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
@ -544,6 +545,11 @@ public class ActorSystemContext {
@Getter @Getter
private CalculatedFieldStateService calculatedFieldStateService; private CalculatedFieldStateService calculatedFieldStateService;
@Lazy
@Autowired(required = false)
@Getter
private CalculatedFieldQueueService calculatedFieldQueueService;
@Lazy @Lazy
@Autowired(required = false) @Autowired(required = false)
@Getter @Getter

View File

@ -23,6 +23,7 @@ import org.bouncycastle.util.Arrays;
import org.thingsboard.common.util.DebugModeUtil; import org.thingsboard.common.util.DebugModeUtil;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.api.CalculatedFieldQueueService;
import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.RuleEngineAlarmService; import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
@ -902,6 +903,11 @@ public class DefaultTbContext implements TbContext {
return mainCtx.getCalculatedFieldService(); return mainCtx.getCalculatedFieldService();
} }
@Override
public CalculatedFieldQueueService getCalculatedFieldQueueService() {
return mainCtx.getCalculatedFieldQueueService();
}
@Override @Override
public boolean isExternalNodeForceAck() { public boolean isExternalNodeForceAck() {
return mainCtx.isExternalNodeForceAck(); return mainCtx.isExternalNodeForceAck();

View File

@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.CalculatedFieldQueueService;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
@ -47,6 +48,7 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -89,6 +91,14 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback); () -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
} }
@Override
public void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback<Void> callback) {
var tenantId = request.getTenantId();
var entityId = request.getEntityId();
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()),
() -> toCalculatedFieldTelemetryMsgProto(request), callback);
}
@Override @Override
public void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback) { public void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback) {
var tenantId = request.getTenantId(); var tenantId = request.getTenantId();
@ -97,6 +107,14 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback); () -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
} }
@Override
public void pushRequestToQueue(AttributesSaveRequest request, FutureCallback<Void> callback) {
var tenantId = request.getTenantId();
var entityId = request.getEntityId();
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()),
() -> toCalculatedFieldTelemetryMsgProto(request), callback);
}
@Override @Override
public void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback) { public void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback) {
var tenantId = request.getTenantId(); var tenantId = request.getTenantId();
@ -152,22 +170,34 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
}; };
} }
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request) {
return toCalculatedFieldTelemetryMsgProto(request, null);
}
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) { private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) {
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType());
List<TsKvEntry> entries = request.getEntries();
List<Long> versions = result.getVersions();
for (int i = 0; i < entries.size(); i++) {
long tsVersion = versions.get(i);
TsKvProto tsProto = toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build();
telemetryMsg.addTsData(tsProto);
}
msg.setTelemetryMsg(telemetryMsg.build());
List<TsKvEntry> entries = request.getEntries();
List<Long> versions = result != null ? result.getVersions() : Collections.emptyList();
for (int i = 0; i < entries.size(); i++) {
TsKvProto.Builder tsProtoBuilder = toTsKvProto(entries.get(i)).toBuilder();
if (result != null) {
tsProtoBuilder.setVersion(versions.get(i));
}
telemetryMsg.addTsData(tsProtoBuilder.build());
}
msg.setTelemetryMsg(telemetryMsg.build());
return msg.build(); return msg.build();
} }
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request) {
return toCalculatedFieldTelemetryMsgProto(request, null);
}
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List<Long> versions) { private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List<Long> versions) {
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
@ -175,9 +205,11 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name())); telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name()));
List<AttributeKvEntry> entries = request.getEntries(); List<AttributeKvEntry> entries = request.getEntries();
for (int i = 0; i < entries.size(); i++) { for (int i = 0; i < entries.size(); i++) {
long attrVersion = versions.get(i); AttributeValueProto.Builder attrProtoBuilder = ProtoUtils.toProto(entries.get(i)).toBuilder();
AttributeValueProto attrProto = ProtoUtils.toProto(entries.get(i)).toBuilder().setVersion(attrVersion).build(); if (versions != null) {
telemetryMsg.addAttrData(attrProto); attrProtoBuilder.setVersion(versions.get(i));
}
telemetryMsg.addAttrData(attrProtoBuilder.build());
} }
msg.setTelemetryMsg(telemetryMsg.build()); msg.setTelemetryMsg(telemetryMsg.build());

View File

@ -42,13 +42,17 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
public SingleValueArgumentEntry(TsKvProto entry) { public SingleValueArgumentEntry(TsKvProto entry) {
this.ts = entry.getTs(); this.ts = entry.getTs();
this.version = entry.getVersion(); if (entry.hasVersion()) {
this.version = entry.getVersion();
}
this.kvEntryValue = ProtoUtils.fromProto(entry.getKv()); this.kvEntryValue = ProtoUtils.fromProto(entry.getKv());
} }
public SingleValueArgumentEntry(AttributeValueProto entry) { public SingleValueArgumentEntry(AttributeValueProto entry) {
this.ts = entry.getLastUpdateTs(); this.ts = entry.getLastUpdateTs();
this.version = entry.getVersion(); if (entry.hasVersion()) {
this.version = entry.getVersion();
}
this.kvEntryValue = ProtoUtils.basicKvEntryFromProto(entry); this.kvEntryValue = ProtoUtils.basicKvEntryFromProto(entry);
} }

View File

@ -31,6 +31,7 @@ import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.CalculatedFieldQueueService;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
@ -50,7 +51,6 @@ import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.util.KvUtils; import org.thingsboard.server.dao.util.KvUtils;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldQueueService;
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import org.thingsboard.server.service.subscription.TbSubscriptionUtils;

View File

@ -28,6 +28,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils; import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.rule.engine.api.CalculatedFieldQueueService;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageRecordKey;
@ -56,7 +57,6 @@ import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldQueueService;
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
import org.thingsboard.server.service.subscription.SubscriptionManagerService; import org.thingsboard.server.service.subscription.SubscriptionManagerService;

View File

@ -13,13 +13,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.service.cf; package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import java.util.List; import java.util.List;
@ -34,8 +30,12 @@ public interface CalculatedFieldQueueService {
*/ */
void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback); void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback);
void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback<Void> callback);
void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback); void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback);
void pushRequestToQueue(AttributesSaveRequest request, FutureCallback<Void> callback);
void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback); void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback);
void pushRequestToQueue(TimeseriesDeleteRequest request, List<String> result, FutureCallback<Void> callback); void pushRequestToQueue(TimeseriesDeleteRequest request, List<String> result, FutureCallback<Void> callback);

View File

@ -360,6 +360,8 @@ public interface TbContext {
CalculatedFieldService getCalculatedFieldService(); CalculatedFieldService getCalculatedFieldService();
CalculatedFieldQueueService getCalculatedFieldQueueService();
boolean isExternalNodeForceAck(); boolean isExternalNodeForceAck();
/** /**

View File

@ -0,0 +1,212 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.gson.JsonParser;
import jakarta.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED;
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST;
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST;
import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_DELETED;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "push to calculated fields",
configClazz = EmptyNodeConfiguration.class,
nodeDescription = "Pushes messages to the calculated fields for further processing",
nodeDetails = "Forwards incoming messages to the calculated fields, where they will be processed to compute values based on predefined calculation rules without persisting any data in the database.",
configDirective = "tbNodeEmptyConfig",
icon = "send"
)
public class TbCalculatedFieldsNode implements TbNode {
private EmptyNodeConfiguration config;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
if (!msg.isTypeOneOf(POST_ATTRIBUTES_REQUEST, POST_TELEMETRY_REQUEST, ATTRIBUTES_DELETED, TIMESERIES_DELETED)) {
ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
return;
}
String src = msg.getData();
if (msg.isTypeOf(POST_TELEMETRY_REQUEST)) {
processPostTelemetryRequest(ctx, msg, src);
} else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) {
processPostAttributesRequest(ctx, msg, src);
} else if (msg.isTypeOf(TIMESERIES_DELETED)) {
processTimeSeriesDeleted(ctx, msg, src);
} else {
processAttributesDeleted(ctx, msg, src);
}
}
private void processPostTelemetryRequest(TbContext ctx, TbMsg msg, String src) {
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(src), System.currentTimeMillis());
if (tsKvMap.isEmpty()) {
ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + src));
return;
}
List<TsKvEntry> tsKvEntryList = new ArrayList<>();
for (Map.Entry<Long, List<KvEntry>> tsKvEntry : tsKvMap.entrySet()) {
for (KvEntry kvEntry : tsKvEntry.getValue()) {
tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry));
}
}
TimeseriesSaveRequest timeseriesSaveRequest = TimeseriesSaveRequest.builder()
.tenantId(ctx.getTenantId())
.customerId(msg.getCustomerId())
.entityId(msg.getOriginator())
.entries(tsKvEntryList)
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
.tbMsgId(msg.getId())
.tbMsgType(msg.getInternalType())
.callback(new TelemetryNodeCallback(ctx, msg))
.build();
ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesSaveRequest, timeseriesSaveRequest.getCallback());
}
private void processPostAttributesRequest(TbContext ctx, TbMsg msg, String src) {
List<AttributeKvEntry> newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(src)));
if (newAttributes.isEmpty()) {
ctx.tellSuccess(msg);
return;
}
AttributesSaveRequest attributesSaveRequest = AttributesSaveRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(msg.getOriginator())
.scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE)))
.entries(newAttributes)
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
.tbMsgId(msg.getId())
.tbMsgType(msg.getInternalType())
.callback(new TelemetryNodeCallback(ctx, msg))
.build();
ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback());
}
private void processTimeSeriesDeleted(TbContext ctx, TbMsg msg, String src) {
JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData());
List<String> keysToDelete = JacksonUtil.convertValue(dataJson.get("timeSeries"), new TypeReference<>() {
});
if (keysToDelete.isEmpty()) {
ctx.tellSuccess(msg);
return;
}
TimeseriesDeleteRequest timeseriesDeleteRequest = TimeseriesDeleteRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(msg.getOriginator())
.keys(keysToDelete)
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
.tbMsgId(msg.getId())
.tbMsgType(msg.getInternalType())
.callback(new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> tmp) {
}
@Override
public void onFailure(Throwable t) {
}
})
.build();
ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesDeleteRequest, keysToDelete, getCalculatedFieldCallback(timeseriesDeleteRequest.getCallback(), keysToDelete));
}
private void processAttributesDeleted(TbContext ctx, TbMsg msg, String src) {
JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData());
List<String> keysToDelete = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {
});
if (keysToDelete.isEmpty()) {
ctx.tellSuccess(msg);
return;
}
AttributesDeleteRequest attributesDeleteRequest = AttributesDeleteRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(msg.getOriginator())
.scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE)))
.keys(keysToDelete)
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
.tbMsgId(msg.getId())
.tbMsgType(msg.getInternalType())
.callback(new TelemetryNodeCallback(ctx, msg))
.build();
ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesDeleteRequest, keysToDelete, attributesDeleteRequest.getCallback());
}
private FutureCallback<Void> getCalculatedFieldCallback(FutureCallback<List<String>> originalCallback, List<String> keys) {
return new FutureCallback<Void>() {
@Override
public void onSuccess(Void unused) {
originalCallback.onSuccess(keys);
}
@Override
public void onFailure(Throwable t) {
originalCallback.onFailure(t);
}
};
}
}