added logic to avoid looping

This commit is contained in:
IrynaMatveieva 2024-12-23 17:00:10 +02:00
parent 2d65a6c457
commit 1482d1c4bb
8 changed files with 76 additions and 46 deletions

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.cf;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.KvEntry;
@ -27,7 +28,7 @@ public interface CalculatedFieldExecutionService {
void onCalculatedFieldMsg(TransportProtos.CalculatedFieldMsgProto proto, TbCallback callback);
void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List<? extends KvEntry> telemetry);
void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List<CalculatedFieldId> calculatedFieldIds, List<? extends KvEntry> telemetry);
void onCalculatedFieldStateMsg(TransportProtos.CalculatedFieldStateMsgProto proto, TbCallback callback);

View File

@ -18,11 +18,7 @@ package org.thingsboard.server.service.cf;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.*;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
@ -41,25 +37,8 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.TbMsg;
@ -301,7 +280,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
@Override
public void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List<? extends KvEntry> telemetry) {
public void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List<CalculatedFieldId> calculatedFieldIds, List<? extends KvEntry> telemetry) {
try {
EntityType entityType = entityId.getEntityType();
if (EntityType.DEVICE.equals(entityType) || EntityType.ASSET.equals(entityType) || EntityType.CUSTOMER.equals(entityType) || EntityType.TENANT.equals(entityType)) {
@ -326,7 +305,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
));
if (!updatedTelemetry.isEmpty()) {
executeTelemetryUpdate(tenantId, entityId, calculatedFieldId, updatedTelemetry);
executeTelemetryUpdate(tenantId, entityId, calculatedFieldId, calculatedFieldIds, updatedTelemetry);
}
});
}
@ -335,7 +314,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
}
private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, Map<String, KvEntry> updatedTelemetry) {
private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List<CalculatedFieldId> calculatedFieldIds, Map<String, KvEntry> updatedTelemetry) {
log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", tenantId, entityId, calculatedFieldId);
CalculatedField calculatedField = getOrFetchFromDb(tenantId, calculatedFieldId);
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> new CalculatedFieldCtx(calculatedField, tbelInvokeService));
@ -347,12 +326,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
case ASSET_PROFILE, DEVICE_PROFILE -> {
boolean isCommonEntity = calculatedField.getConfiguration().getReferencedEntities().contains(entityId);
if (isCommonEntity) {
getOrFetchFromDBProfileEntities(tenantId, cfEntityId).forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues));
getOrFetchFromDBProfileEntities(tenantId, cfEntityId).forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues, calculatedFieldIds));
} else {
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues);
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, calculatedFieldIds);
}
}
default -> updateOrInitializeState(calculatedFieldCtx, cfEntityId, argumentValues);
default -> updateOrInitializeState(calculatedFieldCtx, cfEntityId, argumentValues, calculatedFieldIds);
}
log.info("Successfully updated telemetry for calculatedFieldId: [{}]", calculatedFieldId);
}
@ -583,7 +562,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
@Override
public void onSuccess(List<ArgumentEntry> results) {
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues);
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, Collections.emptyList());
callback.onSuccess();
}
@ -671,7 +650,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
return new StringDataEntry(key, defaultValue);
}
private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map<String, ArgumentEntry> argumentValues) {
private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map<String, ArgumentEntry> argumentValues, List<CalculatedFieldId> calculatedFieldIds) {
CalculatedFieldId cfId = calculatedFieldCtx.getCfId();
CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId());
CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, ctxId -> fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType()));
@ -693,7 +672,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
if (allArgsPresent.test(state.getArguments())) {
performCalculation(calculatedFieldCtx, state, entityId);
performCalculation(calculatedFieldCtx, state, entityId, calculatedFieldIds);
}
}
};
@ -714,13 +693,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
performUpdateState.accept(state);
}
private void performCalculation(CalculatedFieldCtx calculatedFieldCtx, CalculatedFieldState state, EntityId entityId) {
private void performCalculation(CalculatedFieldCtx calculatedFieldCtx, CalculatedFieldState state, EntityId entityId, List<CalculatedFieldId> calculatedFieldIds) {
ListenableFuture<CalculatedFieldResult> resultFuture = state.performCalculation(calculatedFieldCtx);
Futures.addCallback(resultFuture, new FutureCallback<>() {
@Override
public void onSuccess(CalculatedFieldResult result) {
if (result != null) {
pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), entityId, result);
pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), calculatedFieldCtx.getCfId(), entityId, result, calculatedFieldIds);
}
}
@ -739,13 +718,17 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class);
}
private void pushMsgToRuleEngine(TenantId tenantId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult) {
private void pushMsgToRuleEngine(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult, List<CalculatedFieldId> calculatedFieldIds) {
try {
String type = calculatedFieldResult.getType();
TbMsgType msgType = "ATTRIBUTES".equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST;
TbMsgMetaData md = "ATTRIBUTES".equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY;
ObjectNode payload = createJsonPayload(calculatedFieldResult);
TbMsg msg = TbMsg.newMsg(msgType, originatorId, md, JacksonUtil.writeValueAsString(payload));
if (calculatedFieldIds.contains(calculatedFieldId)) {
throw new IllegalArgumentException("Calculated field [" + calculatedFieldId.getId() + "] refers to itself, causing an infinite loop.");
}
calculatedFieldIds.add(calculatedFieldId);
TbMsg msg = TbMsg.newMsg().type(msgType).originator(originatorId).calculatedFieldIds(calculatedFieldIds).metaData(md).data(JacksonUtil.writeValueAsString(payload)).build();
clusterService.pushMsgToRuleEngine(tenantId, originatorId, msg, null);
} catch (Exception e) {
log.warn("[{}] Failed to push message to rule engine. CalculatedFieldResult: {}", originatorId, calculatedFieldResult, e);

View File

@ -152,7 +152,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
if (request.isSaveLatest() && !request.isOnlyLatest()) {
addEntityViewCallback(tenantId, entityId, request.getEntries());
}
calculatedFieldExecutionService.onTelemetryUpdate(tenantId, entityId, request.getEntries());
calculatedFieldExecutionService.onTelemetryUpdate(tenantId, entityId, request.getCalculatedFieldIds(), request.getEntries());
return saveFuture;
}
@ -168,7 +168,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
ListenableFuture<List<Long>> saveFuture = attrService.save(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries());
addMainCallback(saveFuture, request.getCallback());
addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice()));
calculatedFieldExecutionService.onTelemetryUpdate(request.getTenantId(), request.getEntityId(), request.getEntries());
calculatedFieldExecutionService.onTelemetryUpdate(request.getTenantId(), request.getEntityId(), request.getCalculatedFieldIds(), request.getEntries());
}
@Override

View File

@ -34,6 +34,8 @@ import org.thingsboard.server.common.msg.gen.MsgProtos;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -64,6 +66,8 @@ public final class TbMsg implements Serializable {
private final UUID correlationId;
private final Integer partition;
private final List<CalculatedFieldId> calculatedFieldIds;
@Getter(value = AccessLevel.NONE)
@JsonIgnore
//This field is not serialized because we use queues and there is no need to do it
@ -112,7 +116,7 @@ public final class TbMsg implements Serializable {
}
private TbMsg(String queueName, UUID id, long ts, TbMsgType internalType, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID correlationId, Integer partition, TbMsgProcessingCtx ctx, TbMsgCallback callback) {
RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID correlationId, Integer partition, List<CalculatedFieldId> calculatedFieldIds, TbMsgProcessingCtx ctx, TbMsgCallback callback) {
this.id = id != null ? id : UUID.randomUUID();
this.queueName = queueName;
if (ts > 0) {
@ -139,6 +143,7 @@ public final class TbMsg implements Serializable {
this.ruleNodeId = ruleNodeId;
this.correlationId = correlationId;
this.partition = partition;
this.calculatedFieldIds = calculatedFieldIds;
this.ctx = ctx != null ? ctx : new TbMsgProcessingCtx();
this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY);
}
@ -200,6 +205,7 @@ public final class TbMsg implements Serializable {
RuleNodeId ruleNodeId = null;
UUID correlationId = null;
Integer partition = null;
List<CalculatedFieldId> calculatedFieldIds = new ArrayList<>();
if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) {
customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB()));
}
@ -214,6 +220,14 @@ public final class TbMsg implements Serializable {
partition = proto.getPartition();
}
for (MsgProtos.CalculatedFieldIdProto cfIdProto : proto.getCalculatedFieldsList()) {
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(
cfIdProto.getCalculatedFieldIdMSB(),
cfIdProto.getCalculatedFieldIdLSB()
));
calculatedFieldIds.add(calculatedFieldId);
}
TbMsgProcessingCtx ctx;
if (proto.hasCtx()) {
ctx = TbMsgProcessingCtx.fromProto(proto.getCtx());
@ -224,7 +238,7 @@ public final class TbMsg implements Serializable {
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), null, proto.getType(), entityId, customerId,
metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, ctx, callback);
metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
}
@ -343,10 +357,12 @@ public final class TbMsg implements Serializable {
protected RuleNodeId ruleNodeId;
protected UUID correlationId;
protected Integer partition;
protected List<CalculatedFieldId> calculatedFieldIds;
protected TbMsgProcessingCtx ctx;
protected TbMsgCallback callback;
TbMsgBuilder() {}
TbMsgBuilder() {
}
TbMsgBuilder(TbMsg tbMsg) {
this.queueName = tbMsg.queueName;
@ -363,6 +379,7 @@ public final class TbMsg implements Serializable {
this.ruleNodeId = tbMsg.ruleNodeId;
this.correlationId = tbMsg.correlationId;
this.partition = tbMsg.partition;
this.calculatedFieldIds = tbMsg.calculatedFieldIds;
this.ctx = tbMsg.ctx;
this.callback = tbMsg.callback;
}
@ -454,6 +471,11 @@ public final class TbMsg implements Serializable {
return this;
}
public TbMsgBuilder calculatedFieldIds(List<CalculatedFieldId> calculatedFieldIds) {
this.calculatedFieldIds = calculatedFieldIds;
return this;
}
public TbMsgBuilder ctx(TbMsgProcessingCtx ctx) {
this.ctx = ctx;
return this;
@ -465,7 +487,7 @@ public final class TbMsg implements Serializable {
}
public TbMsg build() {
return new TbMsg(queueName, id, ts, internalType, type, originator, customerId, metaData, dataType, data, ruleChainId, ruleNodeId, correlationId, partition, ctx, callback);
return new TbMsg(queueName, id, ts, internalType, type, originator, customerId, metaData, dataType, data, ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback);
}
public String toString() {

View File

@ -70,4 +70,11 @@ message TbMsgProto {
int64 correlationIdMSB = 20;
int64 correlationIdLSB = 21;
int32 partition = 22;
repeated CalculatedFieldIdProto calculatedFields = 23;
}
message CalculatedFieldIdProto {
int64 calculatedFieldIdMSB = 1;
int64 calculatedFieldIdLSB = 2;
}

View File

@ -22,6 +22,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
@ -40,6 +41,7 @@ public class AttributesSaveRequest {
private final AttributeScope scope;
private final List<AttributeKvEntry> entries;
private final boolean notifyDevice;
private final List<CalculatedFieldId> calculatedFieldIds;
private final FutureCallback<Void> callback;
public static Builder builder() {
@ -53,6 +55,7 @@ public class AttributesSaveRequest {
private AttributeScope scope;
private List<AttributeKvEntry> entries;
private boolean notifyDevice = true;
private List<CalculatedFieldId> calculatedFieldIds;
private FutureCallback<Void> callback;
Builder() {}
@ -100,6 +103,11 @@ public class AttributesSaveRequest {
return this;
}
public Builder calculatedFieldIds(List<CalculatedFieldId> calculatedFieldIds) {
this.calculatedFieldIds = calculatedFieldIds;
return this;
}
public Builder callback(FutureCallback<Void> callback) {
this.callback = callback;
return this;
@ -120,7 +128,7 @@ public class AttributesSaveRequest {
}
public AttributesSaveRequest build() {
return new AttributesSaveRequest(tenantId, entityId, scope, entries, notifyDevice, callback);
return new AttributesSaveRequest(tenantId, entityId, scope, entries, notifyDevice, calculatedFieldIds, callback);
}
}

View File

@ -20,6 +20,7 @@ import com.google.common.util.concurrent.SettableFuture;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -40,6 +41,7 @@ public class TimeseriesSaveRequest {
private final long ttl;
private final boolean saveLatest;
private final boolean onlyLatest;
private final List<CalculatedFieldId> calculatedFieldIds;
private final FutureCallback<Void> callback;
public static Builder builder() {
@ -56,6 +58,7 @@ public class TimeseriesSaveRequest {
private FutureCallback<Void> callback;
private boolean saveLatest = true;
private boolean onlyLatest;
private List<CalculatedFieldId> calculatedFieldIds;
Builder() {}
@ -103,6 +106,11 @@ public class TimeseriesSaveRequest {
return this;
}
public Builder calculatedFieldIds(List<CalculatedFieldId> calculatedFieldIds) {
this.calculatedFieldIds = calculatedFieldIds;
return this;
}
public Builder callback(FutureCallback<Void> callback) {
this.callback = callback;
return this;
@ -123,7 +131,7 @@ public class TimeseriesSaveRequest {
}
public TimeseriesSaveRequest build() {
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, callback);
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, calculatedFieldIds, callback);
}
}

View File

@ -112,6 +112,7 @@ public class TbMsgTimeseriesNode implements TbNode {
.entries(tsKvEntryList)
.ttl(ttl)
.saveLatest(!config.isSkipLatestPersistence())
.calculatedFieldIds(msg.getCalculatedFieldIds())
.callback(new TelemetryNodeCallback(ctx, msg))
.build());
}