implemented calculated field debug events persistence

This commit is contained in:
IrynaMatveieva 2025-01-31 11:29:38 +02:00
parent a4aa2444ac
commit c1c9fb4f5a
16 changed files with 194 additions and 77 deletions

View File

@ -50,6 +50,7 @@ import org.thingsboard.server.common.data.event.RuleNodeDebugEvent;
import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
@ -105,6 +106,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
@ -129,6 +131,7 @@ import org.thingsboard.server.service.transport.TbCoreToTransportService;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -744,29 +747,34 @@ public class ActorSystemContext {
} }
} }
public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map<String, String> arguments, TbMsg tbMsg, Throwable error) { public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map<String, ArgumentEntry> arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, Throwable error) {
if (checkLimits(tenantId, tbMsg, error)) { try {
try { CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder()
CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder event = CalculatedFieldDebugEvent.builder() .tenantId(tenantId)
.tenantId(tenantId) .entityId(entityId.getId())
.entityId(entityId.getId()) .serviceId(getServiceId())
.serviceId(getServiceId()) .calculatedFieldId(calculatedFieldId)
.calculatedFieldId(calculatedFieldId) .eventEntity(entityId);
.eventEntity(tbMsg.getOriginator()) if (tbMsgId != null) {
.msgId(tbMsg.getId()) eventBuilder.msgId(tbMsgId);
.msgType(tbMsg.getType())
.arguments(JacksonUtil.toString(arguments))
.result(tbMsg.getData());
if (error != null) {
event.error(toString(error));
}
ListenableFuture<Void> future = eventService.saveAsync(event.build());
Futures.addCallback(future, CALCULATED_FIELD_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor());
} catch (IllegalArgumentException ex) {
log.warn("Failed to persist calculated field debug message", ex);
} }
if (tbMsgType != null) {
eventBuilder.msgType(tbMsgType.name());
}
if (arguments != null) {
eventBuilder.arguments(JacksonUtil.toString(arguments));
}
if (result != null) {
eventBuilder.result(result);
}
if (error != null) {
eventBuilder.error(toString(error));
}
ListenableFuture<Void> future = eventService.saveAsync(eventBuilder.build());
Futures.addCallback(future, CALCULATED_FIELD_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor());
} catch (IllegalArgumentException ex) {
log.warn("Failed to persist calculated field debug message", ex);
} }
} }

View File

@ -18,19 +18,18 @@ package org.thingsboard.server.actors.calculatedField;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.thingsboard.common.util.DebugModeUtil;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
import org.thingsboard.server.common.data.id.AssetProfileId;
import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
@ -53,9 +52,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
@ -111,9 +108,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
callback.onSuccess(CALLBACKS_PER_CF); callback.onSuccess(CALLBACKS_PER_CF);
} else { } else {
if (proto.getTsDataCount() > 0) { if (proto.getTsDataCount() > 0) {
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList())); processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
} else if (proto.getAttrDataCount() > 0) { } else if (proto.getAttrDataCount() > 0) {
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList())); processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
} else { } else {
callback.onSuccess(CALLBACKS_PER_CF); callback.onSuccess(CALLBACKS_PER_CF);
} }
@ -136,27 +133,30 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
@SneakyThrows @SneakyThrows
private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) { private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) {
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList())); processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
} }
@SneakyThrows @SneakyThrows
private void processAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) { private void processAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) {
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList())); processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
} }
@SneakyThrows @SneakyThrows
private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback, private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback,
Map<String, ArgumentEntry> newArgValues) { Map<String, ArgumentEntry> newArgValues, UUID tbMsgId, TbMsgType tbMsgType) {
if (newArgValues.isEmpty()) { if (newArgValues.isEmpty()) {
callback.onSuccess(CALLBACKS_PER_CF); callback.onSuccess(CALLBACKS_PER_CF);
} }
CalculatedFieldState state = getOrInitState(ctx); CalculatedFieldState state = getOrInitState(ctx);
if (state.updateState(newArgValues)) { if (state.updateState(newArgValues)) {
if (state.isReady()) { if (state.isReady() && ctx.isInitialized()) {
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS); CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS);
cfIdList = new ArrayList<>(cfIdList); cfIdList = new ArrayList<>(cfIdList);
cfIdList.add(ctx.getCfId()); cfIdList.add(ctx.getCfId());
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResultMap()), null);
}
} else { } else {
callback.onSuccess(); // State was updated but no calculation performed; callback.onSuccess(); // State was updated but no calculation performed;
} }
@ -183,13 +183,27 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
return state; return state;
} }
private UUID toTbMsgId(CalculatedFieldTelemetryMsgProto proto) {
if (proto.getTbMsgIdMSB() != 0 && proto.getTbMsgIdLSB() != 0) {
return new UUID(proto.getTbMsgIdMSB(), proto.getTbMsgIdLSB());
}
return null;
}
private TbMsgType toTbMsgType(CalculatedFieldTelemetryMsgProto proto) {
if (!proto.getTbMsgType().isEmpty()) {
return TbMsgType.valueOf(proto.getTbMsgType());
}
return null;
}
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, List<TsKvProto> data) { private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, List<TsKvProto> data) {
return mapToArguments(ctx.getMainEntityArguments(), data); return mapToArguments(ctx.getMainEntityArguments(), data);
} }
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, EntityId entityId, List<TsKvProto> data) { private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, EntityId entityId, List<TsKvProto> data) {
var argNames = ctx.getLinkedEntityArguments().get(entityId); var argNames = ctx.getLinkedEntityArguments().get(entityId);
if(argNames.isEmpty()) { if (argNames.isEmpty()) {
return Collections.emptyMap(); return Collections.emptyMap();
} }
return mapToArguments(argNames, data); return mapToArguments(argNames, data);
@ -221,7 +235,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List<AttributeValueProto> attrDataList) { private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List<AttributeValueProto> attrDataList) {
var argNames = ctx.getLinkedEntityArguments().get(entityId); var argNames = ctx.getLinkedEntityArguments().get(entityId);
if(argNames.isEmpty()) { if (argNames.isEmpty()) {
return Collections.emptyMap(); return Collections.emptyMap();
} }
return mapToArguments(argNames, scope, attrDataList); return mapToArguments(argNames, scope, attrDataList);

View File

@ -16,16 +16,14 @@
package org.thingsboard.server.actors.calculatedField; package org.thingsboard.server.actors.calculatedField;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.thingsboard.common.util.DebugModeUtil;
import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId; import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId;
import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.AssetProfileId;
@ -38,16 +36,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.queue.discovery.HashPartitionService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
@ -101,6 +90,13 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
public void onFieldInitMsg(CalculatedFieldInitMsg msg) { public void onFieldInitMsg(CalculatedFieldInitMsg msg) {
var cf = msg.getCf(); var cf = msg.getCf();
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService()); var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService());
try {
cfCtx.init();
} catch (Exception e) {
if (DebugModeUtil.isDebugAllAvailable(cf)) {
systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e);
}
}
calculatedFields.put(cf.getId(), cfCtx); calculatedFields.put(cf.getId(), cfCtx);
// We use copy on write lists to safely pass the reference to another actor for the iteration. // We use copy on write lists to safely pass the reference to another actor for the iteration.
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead) // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)

View File

@ -833,7 +833,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) { private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) {
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds()); CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType());
List<TsKvEntry> entries = request.getEntries(); List<TsKvEntry> entries = request.getEntries();
List<Long> versions = result.getVersions(); List<Long> versions = result.getVersions();
for (int i = 0; i < entries.size(); i++) { for (int i = 0; i < entries.size(); i++) {
@ -849,7 +849,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List<Long> versions) { private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List<Long> versions) {
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds()); CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType());
telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name())); telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name()));
List<AttributeKvEntry> entries = request.getEntries(); List<AttributeKvEntry> entries = request.getEntries();
for (int i = 0; i < entries.size(); i++) { for (int i = 0; i < entries.size(); i++) {
@ -862,7 +862,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
return msg.build(); return msg.build();
} }
private CalculatedFieldTelemetryMsgProto.Builder buildTelemetryMsgProto(TenantId tenantId, EntityId entityId, List<CalculatedFieldId> calculatedFieldIds) { private CalculatedFieldTelemetryMsgProto.Builder buildTelemetryMsgProto(TenantId tenantId, EntityId entityId, List<CalculatedFieldId> calculatedFieldIds, UUID tbMsgId, TbMsgType tbMsgType) {
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = CalculatedFieldTelemetryMsgProto.newBuilder(); CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = CalculatedFieldTelemetryMsgProto.newBuilder();
telemetryMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); telemetryMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
@ -878,6 +878,15 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
} }
} }
if (tbMsgId != null) {
telemetryMsg.setTbMsgIdMSB(tbMsgId.getMostSignificantBits());
telemetryMsg.setTbMsgIdLSB(tbMsgId.getLeastSignificantBits());
}
if (tbMsgType != null) {
telemetryMsg.setTbMsgType(tbMsgType.name());
}
return telemetryMsg; return telemetryMsg;
} }

View File

@ -15,19 +15,18 @@
*/ */
package org.thingsboard.server.service.cf.ctx.state; package org.thingsboard.server.service.cf.ctx.state;
import lombok.NoArgsConstructor;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@NoArgsConstructor
public abstract class BaseCalculatedFieldState implements CalculatedFieldState { public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
protected List<String> requiredArguments; protected List<String> requiredArguments;
protected Map<String, ArgumentEntry> arguments; protected Map<String, ArgumentEntry> arguments;
public BaseCalculatedFieldState() {
this.arguments = new HashMap<>();
}
public BaseCalculatedFieldState(List<String> requiredArguments) { public BaseCalculatedFieldState(List<String> requiredArguments) {
this.requiredArguments = requiredArguments; this.requiredArguments = requiredArguments;
this.arguments = new HashMap<>(); this.arguments = new HashMap<>();
@ -35,7 +34,12 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
@Override @Override
public Map<String, ArgumentEntry> getArguments() { public Map<String, ArgumentEntry> getArguments() {
return this.arguments; return arguments;
}
@Override
public List<String> getRequiredArguments() {
return requiredArguments;
} }
@Override @Override
@ -53,7 +57,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
if (existingEntry == null) { if (existingEntry == null) {
validateNewEntry(newEntry); validateNewEntry(newEntry);
arguments.put(key, newEntry.copy()); arguments.put(key, newEntry);
stateUpdated = true; stateUpdated = true;
} else { } else {
stateUpdated = existingEntry.updateEntry(newEntry); stateUpdated = existingEntry.updateEntry(newEntry);
@ -70,7 +74,6 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
!arguments.containsValue(TsRollingArgumentEntry.EMPTY); !arguments.containsValue(TsRollingArgumentEntry.EMPTY);
} }
protected void validateNewEntry(ArgumentEntry newEntry) { protected abstract void validateNewEntry(ArgumentEntry newEntry);
}
} }

View File

@ -17,6 +17,8 @@ package org.thingsboard.server.service.cf.ctx.state;
import lombok.Data; import lombok.Data;
import net.objecthunter.exp4j.Expression; import net.objecthunter.exp4j.Expression;
import net.objecthunter.exp4j.ExpressionBuilder;
import org.mvel2.MVEL;
import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedField;
@ -33,7 +35,6 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import java.util.ArrayList; import java.util.ArrayList;
@ -45,6 +46,8 @@ import java.util.stream.Collectors;
@Data @Data
public class CalculatedFieldCtx { public class CalculatedFieldCtx {
private CalculatedField calculatedField;
private CalculatedFieldId cfId; private CalculatedFieldId cfId;
private TenantId tenantId; private TenantId tenantId;
private EntityId entityId; private EntityId entityId;
@ -61,7 +64,11 @@ public class CalculatedFieldCtx {
private CalculatedFieldScriptEngine calculatedFieldScriptEngine; private CalculatedFieldScriptEngine calculatedFieldScriptEngine;
private ThreadLocal<Expression> customExpression; private ThreadLocal<Expression> customExpression;
private boolean initialized;
public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService) { public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService) {
this.calculatedField = calculatedField;
this.cfId = calculatedField.getId(); this.cfId = calculatedField.getId();
this.tenantId = calculatedField.getTenantId(); this.tenantId = calculatedField.getTenantId();
this.entityId = calculatedField.getEntityId(); this.entityId = calculatedField.getEntityId();
@ -88,10 +95,28 @@ public class CalculatedFieldCtx {
this.output = configuration.getOutput(); this.output = configuration.getOutput();
this.expression = configuration.getExpression(); this.expression = configuration.getExpression();
this.tbelInvokeService = tbelInvokeService; this.tbelInvokeService = tbelInvokeService;
if (CalculatedFieldType.SCRIPT.equals(calculatedField.getType())) { }
this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService);
public void init() {
if (CalculatedFieldType.SCRIPT.equals(cfType)) {
try {
this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService);
initialized = true;
} catch (Exception e) {
throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.", e);
}
} else { } else {
this.customExpression = new ThreadLocal<>(); if (isValidExpression(expression)) {
this.customExpression = ThreadLocal.withInitial(() ->
new ExpressionBuilder(expression)
.implicitMultiplication(true)
.variables(this.arguments.keySet())
.build()
);
initialized = true;
} else {
throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.");
}
} }
} }
@ -108,6 +133,15 @@ public class CalculatedFieldCtx {
); );
} }
private boolean isValidExpression(String expression) {
try {
MVEL.compileExpression(expression);
return true;
} catch (Exception e) {
return false;
}
}
public boolean matches(List<AttributeKvEntry> values, AttributeScope scope) { public boolean matches(List<AttributeKvEntry> values, AttributeScope scope) {
return matchesAttributes(mainEntityArguments, values, scope); return matchesAttributes(mainEntityArguments, values, scope);
} }

View File

@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.service.cf.CalculatedFieldResult; import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.List;
import java.util.Map; import java.util.Map;
@JsonTypeInfo( @JsonTypeInfo(
@ -40,9 +41,12 @@ public interface CalculatedFieldState {
Map<String, ArgumentEntry> getArguments(); Map<String, ArgumentEntry> getArguments();
List<String> getRequiredArguments();
boolean updateState(Map<String, ArgumentEntry> argumentValues); boolean updateState(Map<String, ArgumentEntry> argumentValues);
ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx); ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx);
@JsonIgnore
boolean isReady(); boolean isReady();
} }

View File

@ -19,6 +19,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.Argument;
@ -31,6 +32,7 @@ import java.util.TreeMap;
@Data @Data
@Slf4j @Slf4j
@NoArgsConstructor
public class ScriptCalculatedFieldState extends BaseCalculatedFieldState { public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
public ScriptCalculatedFieldState(List<String> requiredArguments) { public ScriptCalculatedFieldState(List<String> requiredArguments) {
@ -42,6 +44,10 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
return CalculatedFieldType.SCRIPT; return CalculatedFieldType.SCRIPT;
} }
@Override
protected void validateNewEntry(ArgumentEntry newEntry) {
}
@Override @Override
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) { public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) {
arguments.forEach((key, argumentEntry) -> { arguments.forEach((key, argumentEntry) -> {

View File

@ -18,8 +18,7 @@ package org.thingsboard.server.service.cf.ctx.state;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import lombok.Data; import lombok.Data;
import net.objecthunter.exp4j.Expression; import lombok.NoArgsConstructor;
import net.objecthunter.exp4j.ExpressionBuilder;
import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Output; import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.service.cf.CalculatedFieldResult; import org.thingsboard.server.service.cf.CalculatedFieldResult;
@ -28,6 +27,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
@Data @Data
@NoArgsConstructor
public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
public SimpleCalculatedFieldState(List<String> requiredArguments) { public SimpleCalculatedFieldState(List<String> requiredArguments) {
@ -48,16 +48,7 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
@Override @Override
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) { public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) {
String expression = ctx.getExpression(); var expr = ctx.getCustomExpression().get();
ThreadLocal<Expression> customExpression = ctx.getCustomExpression();
var expr = customExpression.get();
if (expr == null) {
expr = new ExpressionBuilder(expression)
.implicitMultiplication(true)
.variables(this.arguments.keySet())
.build();
customExpression.set(expr);
}
for (Map.Entry<String, ArgumentEntry> entry : this.arguments.entrySet()) { for (Map.Entry<String, ArgumentEntry> entry : this.arguments.entrySet()) {
try { try {

View File

@ -58,6 +58,10 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
ActionType actionType = calculatedField.getId() == null ? ActionType.ADDED : ActionType.UPDATED; ActionType actionType = calculatedField.getId() == null ? ActionType.ADDED : ActionType.UPDATED;
TenantId tenantId = calculatedField.getTenantId(); TenantId tenantId = calculatedField.getTenantId();
try { try {
if (ActionType.UPDATED.equals(actionType)) {
CalculatedField existingCf = calculatedFieldService.findById(tenantId, calculatedField.getId());
checkForEntityChange(existingCf, calculatedField);
}
checkCalculatedFieldNumber(tenantId, calculatedField.getEntityId()); checkCalculatedFieldNumber(tenantId, calculatedField.getEntityId());
checkEntityExistence(tenantId, calculatedField.getEntityId()); checkEntityExistence(tenantId, calculatedField.getEntityId());
checkArgumentSize(calculatedField.getConfiguration()); checkArgumentSize(calculatedField.getConfiguration());
@ -98,6 +102,12 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
} }
} }
private void checkForEntityChange(CalculatedField oldCalculatedField, CalculatedField newCalculatedField) {
if (!oldCalculatedField.getEntityId().equals(newCalculatedField.getEntityId())) {
throw new IllegalArgumentException("Changing the calculated field target entity after initialization is prohibited.");
}
}
private void checkEntityExistence(TenantId tenantId, EntityId entityId) { private void checkEntityExistence(TenantId tenantId, EntityId entityId) {
switch (entityId.getEntityType()) { switch (entityId.getEntityType()) {
case ASSET, DEVICE, ASSET_PROFILE, DEVICE_PROFILE -> case ASSET, DEVICE, ASSET_PROFILE, DEVICE_PROFILE ->

View File

@ -799,6 +799,9 @@ message CalculatedFieldTelemetryMsgProto {
repeated TsKvProto tsData = 9; repeated TsKvProto tsData = 9;
AttributeScopeProto scope = 10; AttributeScopeProto scope = 10;
repeated AttributeValueProto attrData = 11; repeated AttributeValueProto attrData = 11;
int64 tbMsgIdMSB = 12;
int64 tbMsgIdLSB = 13;
string tbMsgType = 14;
} }
message CalculatedFieldLinkedTelemetryMsgProto { message CalculatedFieldLinkedTelemetryMsgProto {

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration; import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration;
import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.debug.DebugSettings;
import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.CalculatedFieldLinkId; import org.thingsboard.server.common.data.id.CalculatedFieldLinkId;
import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityIdFactory;
@ -78,6 +79,7 @@ public class DefaultNativeCalculatedFieldRepository implements NativeCalculatedF
int configurationVersion = (int) row.get("configuration_version"); int configurationVersion = (int) row.get("configuration_version");
JsonNode configuration = JacksonUtil.toJsonNode((String) row.get("configuration")); JsonNode configuration = JacksonUtil.toJsonNode((String) row.get("configuration"));
long version = row.get("version") != null ? (long) row.get("version") : 0; long version = row.get("version") != null ? (long) row.get("version") : 0;
String debugSettings = (String) row.get("debug_settings");
Object externalIdObj = row.get("external_id"); Object externalIdObj = row.get("external_id");
CalculatedField calculatedField = new CalculatedField(); CalculatedField calculatedField = new CalculatedField();
@ -90,6 +92,7 @@ public class DefaultNativeCalculatedFieldRepository implements NativeCalculatedF
calculatedField.setConfigurationVersion(configurationVersion); calculatedField.setConfigurationVersion(configurationVersion);
calculatedField.setConfiguration(JacksonUtil.treeToValue(configuration, CalculatedFieldConfiguration.class)); calculatedField.setConfiguration(JacksonUtil.treeToValue(configuration, CalculatedFieldConfiguration.class));
calculatedField.setVersion(version); calculatedField.setVersion(version);
calculatedField.setDebugSettings(JacksonUtil.fromString(debugSettings, DebugSettings.class));
calculatedField.setExternalId(externalIdObj != null ? new CalculatedFieldId(UUID.fromString((String) externalIdObj)) : null); calculatedField.setExternalId(externalIdObj != null ? new CalculatedFieldId(UUID.fromString((String) externalIdObj)) : null);
return calculatedField; return calculatedField;

View File

@ -28,8 +28,10 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import java.util.List; import java.util.List;
import java.util.UUID;
@Getter @Getter
@ToString @ToString
@ -42,6 +44,8 @@ public class AttributesSaveRequest {
private final List<AttributeKvEntry> entries; private final List<AttributeKvEntry> entries;
private final boolean notifyDevice; private final boolean notifyDevice;
private final List<CalculatedFieldId> previousCalculatedFieldIds; private final List<CalculatedFieldId> previousCalculatedFieldIds;
private final UUID tbMsgId;
private final TbMsgType tbMsgType;
private final FutureCallback<Void> callback; private final FutureCallback<Void> callback;
public static Builder builder() { public static Builder builder() {
@ -56,6 +60,8 @@ public class AttributesSaveRequest {
private List<AttributeKvEntry> entries; private List<AttributeKvEntry> entries;
private boolean notifyDevice = true; private boolean notifyDevice = true;
private List<CalculatedFieldId> previousCalculatedFieldIds; private List<CalculatedFieldId> previousCalculatedFieldIds;
private UUID tbMsgId;
private TbMsgType tbMsgType;
private FutureCallback<Void> callback; private FutureCallback<Void> callback;
Builder() {} Builder() {}
@ -108,6 +114,16 @@ public class AttributesSaveRequest {
return this; return this;
} }
public Builder tbMsgId(UUID tbMsgId) {
this.tbMsgId = tbMsgId;
return this;
}
public Builder tbMsgType(TbMsgType tbMsgType) {
this.tbMsgType = tbMsgType;
return this;
}
public Builder callback(FutureCallback<Void> callback) { public Builder callback(FutureCallback<Void> callback) {
this.callback = callback; this.callback = callback;
return this; return this;
@ -128,7 +144,7 @@ public class AttributesSaveRequest {
} }
public AttributesSaveRequest build() { public AttributesSaveRequest build() {
return new AttributesSaveRequest(tenantId, entityId, scope, entries, notifyDevice, previousCalculatedFieldIds, callback); return new AttributesSaveRequest(tenantId, entityId, scope, entries, notifyDevice, previousCalculatedFieldIds, tbMsgId, tbMsgType, callback);
} }
} }

View File

@ -27,8 +27,10 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import java.util.List; import java.util.List;
import java.util.UUID;
@Getter @Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE) @AllArgsConstructor(access = AccessLevel.PRIVATE)
@ -42,6 +44,8 @@ public class TimeseriesSaveRequest {
private final boolean saveLatest; private final boolean saveLatest;
private final boolean onlyLatest; private final boolean onlyLatest;
private final List<CalculatedFieldId> previousCalculatedFieldIds; private final List<CalculatedFieldId> previousCalculatedFieldIds;
private final UUID tbMsgId;
private final TbMsgType tbMsgType;
private final FutureCallback<Void> callback; private final FutureCallback<Void> callback;
public static Builder builder() { public static Builder builder() {
@ -59,6 +63,8 @@ public class TimeseriesSaveRequest {
private boolean saveLatest = true; private boolean saveLatest = true;
private boolean onlyLatest; private boolean onlyLatest;
private List<CalculatedFieldId> previousCalculatedFieldIds; private List<CalculatedFieldId> previousCalculatedFieldIds;
private UUID tbMsgId;
private TbMsgType tbMsgType;
Builder() {} Builder() {}
@ -111,6 +117,16 @@ public class TimeseriesSaveRequest {
return this; return this;
} }
public Builder tbMsgId(UUID tbMsgId) {
this.tbMsgId = tbMsgId;
return this;
}
public Builder tbMsgType(TbMsgType tbMsgType) {
this.tbMsgType = tbMsgType;
return this;
}
public Builder callback(FutureCallback<Void> callback) { public Builder callback(FutureCallback<Void> callback) {
this.callback = callback; this.callback = callback;
return this; return this;
@ -131,7 +147,7 @@ public class TimeseriesSaveRequest {
} }
public TimeseriesSaveRequest build() { public TimeseriesSaveRequest build() {
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, previousCalculatedFieldIds, callback); return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, previousCalculatedFieldIds, tbMsgId, tbMsgType, callback);
} }
} }

View File

@ -126,6 +126,8 @@ public class TbMsgAttributesNode implements TbNode {
.entries(attributes) .entries(attributes)
.notifyDevice(config.isNotifyDevice() || checkNotifyDeviceMdValue(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY))) .notifyDevice(config.isNotifyDevice() || checkNotifyDeviceMdValue(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY)))
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
.tbMsgId(msg.getId())
.tbMsgType(msg.getInternalType())
.callback(callback) .callback(callback)
.build()); .build());
} }

View File

@ -113,6 +113,8 @@ public class TbMsgTimeseriesNode implements TbNode {
.ttl(ttl) .ttl(ttl)
.saveLatest(!config.isSkipLatestPersistence()) .saveLatest(!config.isSkipLatestPersistence())
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
.tbMsgId(msg.getId())
.tbMsgType(msg.getInternalType())
.callback(new TelemetryNodeCallback(ctx, msg)) .callback(new TelemetryNodeCallback(ctx, msg))
.build()); .build());
} }