added ability to preserve last update ts for calculated value
This commit is contained in:
parent
cedc7d2e46
commit
1869296ff2
@ -36,6 +36,8 @@ import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.script.api.tbel.TbelCfArg;
|
||||
import org.thingsboard.script.api.tbel.TbelCfCtx;
|
||||
import org.thingsboard.script.api.tbel.TbelCfSingleValueArg;
|
||||
import org.thingsboard.script.api.tbel.TbelCfTsDoubleVal;
|
||||
import org.thingsboard.script.api.tbel.TbelCfTsRollingArg;
|
||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.EventInfo;
|
||||
@ -242,9 +244,8 @@ public class CalculatedFieldController extends BaseController {
|
||||
ctxAndArgNames.toArray(String[]::new)
|
||||
);
|
||||
|
||||
|
||||
Object[] args = new Object[ctxAndArgNames.size()];
|
||||
args[0] = new TbelCfCtx(arguments);
|
||||
args[0] = new TbelCfCtx(arguments, getLastUpdateTimestamp(arguments));
|
||||
for (int i = 1; i < ctxAndArgNames.size(); i++) {
|
||||
var arg = arguments.get(ctxAndArgNames.get(i));
|
||||
if (arg instanceof TbelCfSingleValueArg svArg) {
|
||||
@ -267,6 +268,20 @@ public class CalculatedFieldController extends BaseController {
|
||||
return result;
|
||||
}
|
||||
|
||||
private long getLastUpdateTimestamp(Map<String, TbelCfArg> arguments) {
|
||||
long lastUpdateTimestamp = -1;
|
||||
for (TbelCfArg entry : arguments.values()) {
|
||||
if (entry instanceof TbelCfSingleValueArg singleValueArg) {
|
||||
long ts = singleValueArg.getTs();
|
||||
lastUpdateTimestamp = Math.max(lastUpdateTimestamp, ts);
|
||||
} else if (entry instanceof TbelCfTsRollingArg tsRollingArg) {
|
||||
long maxTs = tsRollingArg.getValues().stream().mapToLong(TbelCfTsDoubleVal::getTs).max().orElse(-1);
|
||||
lastUpdateTimestamp = Math.max(lastUpdateTimestamp, maxTs);
|
||||
}
|
||||
}
|
||||
return lastUpdateTimestamp;
|
||||
}
|
||||
|
||||
private <E extends HasId<I> & HasTenantId, I extends EntityId> void checkReferencedEntities(CalculatedFieldConfiguration calculatedFieldConfig, SecurityUser user) throws ThingsboardException {
|
||||
List<EntityId> referencedEntityIds = calculatedFieldConfig.getReferencedEntities();
|
||||
for (EntityId referencedEntityId : referencedEntityIds) {
|
||||
|
||||
@ -35,13 +35,15 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
|
||||
protected Map<String, ArgumentEntry> arguments;
|
||||
protected boolean sizeExceedsLimit;
|
||||
|
||||
protected long lastUpdateTimestamp = -1;
|
||||
|
||||
public BaseCalculatedFieldState(List<String> requiredArguments) {
|
||||
this.requiredArguments = requiredArguments;
|
||||
this.arguments = new HashMap<>();
|
||||
}
|
||||
|
||||
public BaseCalculatedFieldState() {
|
||||
this(new ArrayList<>(), new HashMap<>(), false);
|
||||
this(new ArrayList<>(), new HashMap<>(), false, -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -59,14 +61,21 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
|
||||
checkArgumentSize(key, newEntry, ctx);
|
||||
|
||||
ArgumentEntry existingEntry = arguments.get(key);
|
||||
boolean entryUpdated;
|
||||
|
||||
if (existingEntry == null || newEntry.isForceResetPrevious()) {
|
||||
validateNewEntry(newEntry);
|
||||
arguments.put(key, newEntry);
|
||||
stateUpdated = true;
|
||||
entryUpdated = true;
|
||||
} else {
|
||||
stateUpdated = existingEntry.updateEntry(newEntry);
|
||||
entryUpdated = existingEntry.updateEntry(newEntry);
|
||||
}
|
||||
|
||||
if (entryUpdated) {
|
||||
stateUpdated = true;
|
||||
updateLastUpdateTimestamp(newEntry);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return stateUpdated;
|
||||
@ -100,4 +109,17 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
|
||||
|
||||
protected abstract void validateNewEntry(ArgumentEntry newEntry);
|
||||
|
||||
private void updateLastUpdateTimestamp(ArgumentEntry entry) {
|
||||
if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
|
||||
long ts = singleValueArgumentEntry.getTs();
|
||||
this.lastUpdateTimestamp = Math.max(this.lastUpdateTimestamp, ts);
|
||||
} else if (entry instanceof TsRollingArgumentEntry tsRollingArgumentEntry) {
|
||||
Map.Entry<Long, Double> lastEntry = tsRollingArgumentEntry.getTsRecords().pollLastEntry();
|
||||
if (lastEntry != null) {
|
||||
long ts = lastEntry.getKey();
|
||||
this.lastUpdateTimestamp = Math.max(this.lastUpdateTimestamp, ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
|
||||
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
|
||||
import org.thingsboard.server.common.data.cf.configuration.Output;
|
||||
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
|
||||
import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
@ -61,6 +62,7 @@ public class CalculatedFieldCtx {
|
||||
private final List<String> argNames;
|
||||
private Output output;
|
||||
private String expression;
|
||||
private boolean preserveLatestTs;
|
||||
private TbelInvokeService tbelInvokeService;
|
||||
private CalculatedFieldScriptEngine calculatedFieldScriptEngine;
|
||||
private ThreadLocal<Expression> customExpression;
|
||||
@ -94,6 +96,7 @@ public class CalculatedFieldCtx {
|
||||
this.argNames = new ArrayList<>(arguments.keySet());
|
||||
this.output = configuration.getOutput();
|
||||
this.expression = configuration.getExpression();
|
||||
this.preserveLatestTs = CalculatedFieldType.SIMPLE.equals(calculatedField.getType()) && ((SimpleCalculatedFieldConfiguration) configuration).isPreserveLastUpdateTs();
|
||||
this.tbelInvokeService = tbelInvokeService;
|
||||
|
||||
this.maxDataPointsPerRollingArg = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxDataPointsPerRollingArg);
|
||||
|
||||
@ -42,6 +42,8 @@ public interface CalculatedFieldState {
|
||||
|
||||
Map<String, ArgumentEntry> getArguments();
|
||||
|
||||
long getLastUpdateTimestamp();
|
||||
|
||||
void setRequiredArguments(List<String> requiredArguments);
|
||||
|
||||
boolean updateState(CalculatedFieldCtx ctx, Map<String, ArgumentEntry> argumentValues);
|
||||
|
||||
@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.cf.configuration.Output;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldResult;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -67,7 +66,7 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
|
||||
args.add(arg);
|
||||
}
|
||||
}
|
||||
args.set(0, new TbelCfCtx(arguments));
|
||||
args.set(0, new TbelCfCtx(arguments, getLastUpdateTimestamp()));
|
||||
ListenableFuture<JsonNode> resultFuture = ctx.getCalculatedFieldScriptEngine().executeJsonAsync(args.toArray());
|
||||
Output output = ctx.getOutput();
|
||||
return Futures.transform(resultFuture,
|
||||
|
||||
@ -15,6 +15,8 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.cf.ctx.state;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.Data;
|
||||
@ -65,19 +67,34 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
|
||||
double expressionResult = expr.evaluate();
|
||||
|
||||
Output output = ctx.getOutput();
|
||||
Object result;
|
||||
Integer decimals = output.getDecimalsByDefault();
|
||||
if (decimals != null) {
|
||||
if (decimals.equals(0)) {
|
||||
result = TbUtils.toInt(expressionResult);
|
||||
} else {
|
||||
result = TbUtils.toFixed(expressionResult, decimals);
|
||||
}
|
||||
} else {
|
||||
result = expressionResult;
|
||||
Object result = formatResult(expressionResult, output.getDecimalsByDefault());
|
||||
JsonNode outputResult = createResultJson(ctx.isPreserveLatestTs(), output.getName(), result);
|
||||
|
||||
return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), outputResult));
|
||||
}
|
||||
|
||||
return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), JacksonUtil.valueToTree(Map.of(output.getName(), result))));
|
||||
private Object formatResult(double expressionResult, Integer decimals) {
|
||||
if (decimals == null) {
|
||||
return expressionResult;
|
||||
}
|
||||
return decimals.equals(0)
|
||||
? TbUtils.toInt(expressionResult)
|
||||
: TbUtils.toFixed(expressionResult, decimals);
|
||||
}
|
||||
|
||||
private JsonNode createResultJson(boolean preserveLatestTs, String outputName, Object result) {
|
||||
ObjectNode valuesNode = JacksonUtil.newObjectNode();
|
||||
valuesNode.set(outputName, JacksonUtil.valueToTree(result));
|
||||
|
||||
long lastTimestamp = getLastUpdateTimestamp();
|
||||
if (preserveLatestTs && lastTimestamp != -1) {
|
||||
ObjectNode resultNode = JacksonUtil.newObjectNode();
|
||||
resultNode.put("ts", lastTimestamp);
|
||||
resultNode.set("values", valuesNode);
|
||||
return resultNode;
|
||||
} else {
|
||||
return valuesNode;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -23,6 +23,8 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldType;
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class SimpleCalculatedFieldConfiguration extends BaseCalculatedFieldConfiguration implements CalculatedFieldConfiguration {
|
||||
|
||||
private boolean preserveLastUpdateTs;
|
||||
|
||||
@Override
|
||||
public CalculatedFieldType getType() {
|
||||
return CalculatedFieldType.SIMPLE;
|
||||
|
||||
@ -24,9 +24,12 @@ public class TbelCfCtx implements TbelCfObject {
|
||||
|
||||
@Getter
|
||||
private final Map<String, TbelCfArg> args;
|
||||
@Getter
|
||||
private final long lastTs;
|
||||
|
||||
public TbelCfCtx(Map<String, TbelCfArg> args) {
|
||||
public TbelCfCtx(Map<String, TbelCfArg> args, long lastUpdateTs) {
|
||||
this.args = Collections.unmodifiableMap(args);
|
||||
this.lastTs = lastUpdateTs != -1 ? lastUpdateTs : System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user