Merge pull request #12833 from irynamatveieva/calculated-field-rule-node
Added calculated fields rule node
This commit is contained in:
commit
f55967ee98
@ -109,6 +109,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldQueueService;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
|
||||
import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache;
|
||||
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
|
||||
@ -544,6 +545,11 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private CalculatedFieldStateService calculatedFieldStateService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
private CalculatedFieldQueueService calculatedFieldQueueService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.rule.engine.api.NotificationCenter;
|
||||
import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
|
||||
import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService;
|
||||
import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache;
|
||||
import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService;
|
||||
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
|
||||
import org.thingsboard.rule.engine.api.DeviceStateManager;
|
||||
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
|
||||
@ -902,6 +903,11 @@ public class DefaultTbContext implements TbContext {
|
||||
return mainCtx.getCalculatedFieldService();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RuleEngineCalculatedFieldQueueService getCalculatedFieldQueueService() {
|
||||
return mainCtx.getCalculatedFieldQueueService();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isExternalNodeForceAck() {
|
||||
return mainCtx.isExternalNodeForceAck();
|
||||
|
||||
@ -18,13 +18,14 @@ package org.thingsboard.server.service.cf;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
|
||||
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
|
||||
import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService;
|
||||
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
|
||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
|
||||
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface CalculatedFieldQueueService {
|
||||
public interface CalculatedFieldQueueService extends RuleEngineCalculatedFieldQueueService {
|
||||
|
||||
/**
|
||||
* Filter CFs based on the request entity. Push to the queue if any matching CF exist;
|
||||
|
||||
@ -47,6 +47,7 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
@ -89,6 +90,11 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback<Void> callback) {
|
||||
pushRequestToQueue(request, null, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback) {
|
||||
var tenantId = request.getTenantId();
|
||||
@ -97,6 +103,11 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushRequestToQueue(AttributesSaveRequest request, FutureCallback<Void> callback) {
|
||||
pushRequestToQueue(request, null, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback) {
|
||||
var tenantId = request.getTenantId();
|
||||
@ -156,15 +167,19 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
|
||||
|
||||
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType());
|
||||
List<TsKvEntry> entries = request.getEntries();
|
||||
List<Long> versions = result.getVersions();
|
||||
for (int i = 0; i < entries.size(); i++) {
|
||||
long tsVersion = versions.get(i);
|
||||
TsKvProto tsProto = toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build();
|
||||
telemetryMsg.addTsData(tsProto);
|
||||
}
|
||||
msg.setTelemetryMsg(telemetryMsg.build());
|
||||
|
||||
List<TsKvEntry> entries = request.getEntries();
|
||||
List<Long> versions = result != null ? result.getVersions() : Collections.emptyList();
|
||||
|
||||
for (int i = 0; i < entries.size(); i++) {
|
||||
TsKvProto.Builder tsProtoBuilder = toTsKvProto(entries.get(i)).toBuilder();
|
||||
if (result != null) {
|
||||
tsProtoBuilder.setVersion(versions.get(i));
|
||||
}
|
||||
telemetryMsg.addTsData(tsProtoBuilder.build());
|
||||
}
|
||||
|
||||
msg.setTelemetryMsg(telemetryMsg.build());
|
||||
return msg.build();
|
||||
}
|
||||
|
||||
@ -175,9 +190,11 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
||||
telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name()));
|
||||
List<AttributeKvEntry> entries = request.getEntries();
|
||||
for (int i = 0; i < entries.size(); i++) {
|
||||
long attrVersion = versions.get(i);
|
||||
AttributeValueProto attrProto = ProtoUtils.toProto(entries.get(i)).toBuilder().setVersion(attrVersion).build();
|
||||
telemetryMsg.addAttrData(attrProto);
|
||||
AttributeValueProto.Builder attrProtoBuilder = ProtoUtils.toProto(entries.get(i)).toBuilder();
|
||||
if (versions != null) {
|
||||
attrProtoBuilder.setVersion(versions.get(i));
|
||||
}
|
||||
telemetryMsg.addAttrData(attrProtoBuilder.build());
|
||||
}
|
||||
msg.setTelemetryMsg(telemetryMsg.build());
|
||||
|
||||
|
||||
@ -42,13 +42,17 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
|
||||
|
||||
public SingleValueArgumentEntry(TsKvProto entry) {
|
||||
this.ts = entry.getTs();
|
||||
this.version = entry.getVersion();
|
||||
if (entry.hasVersion()) {
|
||||
this.version = entry.getVersion();
|
||||
}
|
||||
this.kvEntryValue = ProtoUtils.fromProto(entry.getKv());
|
||||
}
|
||||
|
||||
public SingleValueArgumentEntry(AttributeValueProto entry) {
|
||||
this.ts = entry.getLastUpdateTs();
|
||||
this.version = entry.getVersion();
|
||||
if (entry.hasVersion()) {
|
||||
this.version = entry.getVersion();
|
||||
}
|
||||
this.kvEntryValue = ProtoUtils.basicKvEntryFromProto(entry);
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,26 @@
|
||||
/**
|
||||
* Copyright © 2016-2025 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.api;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
|
||||
public interface RuleEngineCalculatedFieldQueueService {
|
||||
|
||||
void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback<Void> callback);
|
||||
|
||||
void pushRequestToQueue(AttributesSaveRequest request, FutureCallback<Void> callback);
|
||||
|
||||
}
|
||||
@ -360,6 +360,8 @@ public interface TbContext {
|
||||
|
||||
CalculatedFieldService getCalculatedFieldService();
|
||||
|
||||
RuleEngineCalculatedFieldQueueService getCalculatedFieldQueueService();
|
||||
|
||||
boolean isExternalNodeForceAck();
|
||||
|
||||
/**
|
||||
|
||||
@ -0,0 +1,125 @@
|
||||
/**
|
||||
* Copyright © 2016-2025 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.telemetry;
|
||||
|
||||
import com.google.gson.JsonParser;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
|
||||
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.RuleNode;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
|
||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||
import org.thingsboard.server.common.adaptor.JsonConverter;
|
||||
import org.thingsboard.server.common.data.AttributeScope;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
|
||||
|
||||
@Slf4j
|
||||
@RuleNode(
|
||||
type = ComponentType.ACTION,
|
||||
name = "calculated fields",
|
||||
configClazz = EmptyNodeConfiguration.class,
|
||||
nodeDescription = "Pushes incoming messages to calculated fields service",
|
||||
nodeDetails = "Node enables the processing of calculated fields without persisting incoming messages to the database. " +
|
||||
"By default, the processing of calculated fields is triggered by the <b>save attributes</b> and <b>save time series</b> nodes. " +
|
||||
"This rule node accepts the same messages as these nodes but allows you to trigger the processing of calculated " +
|
||||
"fields independently, ensuring that derived data can be computed and utilized in real time without storing the original message in the database.",
|
||||
configDirective = "tbNodeEmptyConfig",
|
||||
icon = "published_with_changes"
|
||||
)
|
||||
public class TbCalculatedFieldsNode implements TbNode {
|
||||
|
||||
private EmptyNodeConfiguration config;
|
||||
|
||||
@Override
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
switch (msg.getInternalType()) {
|
||||
case POST_TELEMETRY_REQUEST -> processPostTelemetryRequest(ctx, msg);
|
||||
case POST_ATTRIBUTES_REQUEST -> processPostAttributesRequest(ctx, msg);
|
||||
default -> ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
|
||||
}
|
||||
}
|
||||
|
||||
private void processPostTelemetryRequest(TbContext ctx, TbMsg msg) {
|
||||
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(msg.getData()), System.currentTimeMillis());
|
||||
|
||||
if (tsKvMap.isEmpty()) {
|
||||
ctx.tellSuccess(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
List<TsKvEntry> tsKvEntryList = new ArrayList<>();
|
||||
for (Map.Entry<Long, List<KvEntry>> tsKvEntry : tsKvMap.entrySet()) {
|
||||
for (KvEntry kvEntry : tsKvEntry.getValue()) {
|
||||
tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry));
|
||||
}
|
||||
}
|
||||
|
||||
TimeseriesSaveRequest timeseriesSaveRequest = TimeseriesSaveRequest.builder()
|
||||
.tenantId(ctx.getTenantId())
|
||||
.customerId(msg.getCustomerId())
|
||||
.entityId(msg.getOriginator())
|
||||
.entries(tsKvEntryList)
|
||||
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
|
||||
.tbMsgId(msg.getId())
|
||||
.tbMsgType(msg.getInternalType())
|
||||
.callback(new TelemetryNodeCallback(ctx, msg))
|
||||
.build();
|
||||
|
||||
ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesSaveRequest, timeseriesSaveRequest.getCallback());
|
||||
}
|
||||
|
||||
private void processPostAttributesRequest(TbContext ctx, TbMsg msg) {
|
||||
List<AttributeKvEntry> newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(msg.getData())));
|
||||
|
||||
if (newAttributes.isEmpty()) {
|
||||
ctx.tellSuccess(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
AttributesSaveRequest attributesSaveRequest = AttributesSaveRequest.builder()
|
||||
.tenantId(ctx.getTenantId())
|
||||
.entityId(msg.getOriginator())
|
||||
.scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE)))
|
||||
.entries(newAttributes)
|
||||
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
|
||||
.tbMsgId(msg.getId())
|
||||
.tbMsgType(msg.getInternalType())
|
||||
.callback(new TelemetryNodeCallback(ctx, msg))
|
||||
.build();
|
||||
ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback());
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user