Merge pull request #13227 from irynamatveieva/improvements/calculated-fields

Added ability to preserve last update ts for calculated value
This commit is contained in:
Viacheslav Klimov 2025-04-28 15:00:25 +03:00 committed by GitHub
commit 63faeacc69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 207 additions and 40 deletions

View File

@ -36,6 +36,8 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.script.api.tbel.TbelCfArg;
import org.thingsboard.script.api.tbel.TbelCfCtx;
import org.thingsboard.script.api.tbel.TbelCfSingleValueArg;
import org.thingsboard.script.api.tbel.TbelCfTsDoubleVal;
import org.thingsboard.script.api.tbel.TbelCfTsRollingArg;
import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EventInfo;
@ -242,9 +244,8 @@ public class CalculatedFieldController extends BaseController {
ctxAndArgNames.toArray(String[]::new)
);
Object[] args = new Object[ctxAndArgNames.size()];
args[0] = new TbelCfCtx(arguments);
args[0] = new TbelCfCtx(arguments, getLastUpdateTimestamp(arguments));
for (int i = 1; i < ctxAndArgNames.size(); i++) {
var arg = arguments.get(ctxAndArgNames.get(i));
if (arg instanceof TbelCfSingleValueArg svArg) {
@ -267,6 +268,20 @@ public class CalculatedFieldController extends BaseController {
return result;
}
private long getLastUpdateTimestamp(Map<String, TbelCfArg> arguments) {
long lastUpdateTimestamp = -1;
for (TbelCfArg entry : arguments.values()) {
if (entry instanceof TbelCfSingleValueArg singleValueArg) {
long ts = singleValueArg.getTs();
lastUpdateTimestamp = Math.max(lastUpdateTimestamp, ts);
} else if (entry instanceof TbelCfTsRollingArg tsRollingArg) {
long maxTs = tsRollingArg.getValues().stream().mapToLong(TbelCfTsDoubleVal::getTs).max().orElse(-1);
lastUpdateTimestamp = Math.max(lastUpdateTimestamp, maxTs);
}
}
return lastUpdateTimestamp;
}
private <E extends HasId<I> & HasTenantId, I extends EntityId> void checkReferencedEntities(CalculatedFieldConfiguration calculatedFieldConfig, SecurityUser user) throws ThingsboardException {
List<EntityId> referencedEntityIds = calculatedFieldConfig.getReferencedEntities();
for (EntityId referencedEntityId : referencedEntityIds) {

View File

@ -35,13 +35,15 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
protected Map<String, ArgumentEntry> arguments;
protected boolean sizeExceedsLimit;
protected long lastUpdateTimestamp = -1;
public BaseCalculatedFieldState(List<String> requiredArguments) {
this.requiredArguments = requiredArguments;
this.arguments = new HashMap<>();
}
public BaseCalculatedFieldState() {
this(new ArrayList<>(), new HashMap<>(), false);
this(new ArrayList<>(), new HashMap<>(), false, -1);
}
@Override
@ -59,14 +61,21 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
checkArgumentSize(key, newEntry, ctx);
ArgumentEntry existingEntry = arguments.get(key);
boolean entryUpdated;
if (existingEntry == null || newEntry.isForceResetPrevious()) {
validateNewEntry(newEntry);
arguments.put(key, newEntry);
stateUpdated = true;
entryUpdated = true;
} else {
stateUpdated = existingEntry.updateEntry(newEntry);
entryUpdated = existingEntry.updateEntry(newEntry);
}
if (entryUpdated) {
stateUpdated = true;
updateLastUpdateTimestamp(newEntry);
}
}
return stateUpdated;
@ -100,4 +109,15 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
protected abstract void validateNewEntry(ArgumentEntry newEntry);
private void updateLastUpdateTimestamp(ArgumentEntry entry) {
if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
this.lastUpdateTimestamp = singleValueArgumentEntry.getTs();
} else if (entry instanceof TsRollingArgumentEntry tsRollingArgumentEntry) {
Map.Entry<Long, Double> lastEntry = tsRollingArgumentEntry.getTsRecords().pollLastEntry();
if (lastEntry != null) {
this.lastUpdateTimestamp = lastEntry.getKey();
}
}
}
}

View File

@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -61,6 +62,7 @@ public class CalculatedFieldCtx {
private final List<String> argNames;
private Output output;
private String expression;
private boolean preserveMsgTs;
private TbelInvokeService tbelInvokeService;
private CalculatedFieldScriptEngine calculatedFieldScriptEngine;
private ThreadLocal<Expression> customExpression;
@ -94,6 +96,7 @@ public class CalculatedFieldCtx {
this.argNames = new ArrayList<>(arguments.keySet());
this.output = configuration.getOutput();
this.expression = configuration.getExpression();
this.preserveMsgTs = CalculatedFieldType.SIMPLE.equals(calculatedField.getType()) && ((SimpleCalculatedFieldConfiguration) configuration).isPreserveMsgTs();
this.tbelInvokeService = tbelInvokeService;
this.maxDataPointsPerRollingArg = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxDataPointsPerRollingArg);

View File

@ -42,6 +42,8 @@ public interface CalculatedFieldState {
Map<String, ArgumentEntry> getArguments();
long getLastUpdateTimestamp();
void setRequiredArguments(List<String> requiredArguments);
boolean updateState(CalculatedFieldCtx ctx, Map<String, ArgumentEntry> argumentValues);

View File

@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -67,7 +66,7 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
args.add(arg);
}
}
args.set(0, new TbelCfCtx(arguments));
args.set(0, new TbelCfCtx(arguments, getLastUpdateTimestamp()));
ListenableFuture<JsonNode> resultFuture = ctx.getCalculatedFieldScriptEngine().executeJsonAsync(args.toArray());
Output output = ctx.getOutput();
return Futures.transform(resultFuture,

View File

@ -15,6 +15,8 @@
*/
package org.thingsboard.server.service.cf.ctx.state;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.Data;
@ -65,19 +67,35 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
double expressionResult = expr.evaluate();
Output output = ctx.getOutput();
Object result;
Integer decimals = output.getDecimalsByDefault();
if (decimals != null) {
if (decimals.equals(0)) {
result = TbUtils.toInt(expressionResult);
} else {
result = TbUtils.toFixed(expressionResult, decimals);
}
} else {
result = expressionResult;
}
Object result = formatResult(expressionResult, output.getDecimalsByDefault());
JsonNode outputResult = createResultJson(ctx.isPreserveMsgTs(), output.getName(), result);
return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), JacksonUtil.valueToTree(Map.of(output.getName(), result))));
return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), outputResult));
}
private Object formatResult(double expressionResult, Integer decimals) {
if (decimals == null) {
return expressionResult;
}
if (decimals.equals(0)) {
return TbUtils.toInt(expressionResult);
}
return TbUtils.toFixed(expressionResult, decimals);
}
private JsonNode createResultJson(boolean preserveMsgTs, String outputName, Object result) {
ObjectNode valuesNode = JacksonUtil.newObjectNode();
valuesNode.set(outputName, JacksonUtil.valueToTree(result));
long lastTimestamp = getLastUpdateTimestamp();
if (preserveMsgTs && lastTimestamp != -1) {
ObjectNode resultNode = JacksonUtil.newObjectNode();
resultNode.put("ts", lastTimestamp);
resultNode.set("values", valuesNode);
return resultNode;
} else {
return valuesNode;
}
}
}

View File

@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.cf.configuration.OutputType;
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
import org.thingsboard.server.common.data.cf.configuration.ScriptCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.debug.DebugSettings;
import org.thingsboard.server.common.data.id.AssetProfileId;
@ -462,6 +463,87 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
});
}
@Test
public void testSimpleCalculatedFieldWhenPreserveMsgTsIsTrue() throws Exception {
Device testDevice = createDevice("Test device", "1234567890");
long ts = System.currentTimeMillis() - 300000L;
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"temperature\":30}}", ts)));
CalculatedField calculatedField = new CalculatedField();
calculatedField.setEntityId(testDevice.getId());
calculatedField.setType(CalculatedFieldType.SIMPLE);
calculatedField.setName("C to F");
calculatedField.setDebugSettings(DebugSettings.all());
calculatedField.setConfigurationVersion(1);
SimpleCalculatedFieldConfiguration config = new SimpleCalculatedFieldConfiguration();
Argument argument = new Argument();
ReferencedEntityKey refEntityKey = new ReferencedEntityKey("temperature", ArgumentType.TS_LATEST, null);
argument.setRefEntityKey(refEntityKey);
config.setArguments(Map.of("T", argument));
config.setExpression("(T * 9/5) + 32");
Output output = new Output();
output.setName("fahrenheitTemp");
output.setType(OutputType.TIME_SERIES);
config.setOutput(output);
config.setPreserveMsgTs(true);
calculatedField.setConfiguration(config);
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
await().alias("create CF -> perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("ts").asText()).isEqualTo(Long.toString(ts));
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0");
});
}
@Test
public void testScriptCalculatedFieldWhenUsedMsgTsInScript() throws Exception {
Device testDevice = createDevice("Test device", "1234567890");
long ts = System.currentTimeMillis() - 300000L;
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"temperature\":30}}", ts)));
CalculatedField calculatedField = new CalculatedField();
calculatedField.setEntityId(testDevice.getId());
calculatedField.setType(CalculatedFieldType.SCRIPT);
calculatedField.setName("C to F");
calculatedField.setDebugSettings(DebugSettings.all());
calculatedField.setConfigurationVersion(1);
ScriptCalculatedFieldConfiguration config = new ScriptCalculatedFieldConfiguration();
Argument argument = new Argument();
ReferencedEntityKey refEntityKey = new ReferencedEntityKey("temperature", ArgumentType.TS_LATEST, null);
argument.setRefEntityKey(refEntityKey);
config.setArguments(Map.of("T", argument));
config.setExpression("return {\"ts\": ctx.msgTs, \"values\": {\"fahrenheitTemp\": (T * 1.8) + 32}};");
Output output = new Output();
output.setType(OutputType.TIME_SERIES);
config.setOutput(output);
calculatedField.setConfiguration(config);
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
await().alias("create CF -> perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
assertThat(fahrenheitTemp).isNotNull();
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("ts").asText()).isEqualTo(Long.toString(ts));
assertThat(fahrenheitTemp.get("fahrenheitTemp").get(0).get("value").asText()).isEqualTo("86.0");
});
}
private ObjectNode getLatestTelemetry(EntityId entityId, String... keys) throws Exception {
return doGetAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/values/timeseries?keys=" + String.join(",", keys), ObjectNode.class);
}

View File

@ -23,6 +23,8 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldType;
@EqualsAndHashCode(callSuper = true)
public class SimpleCalculatedFieldConfiguration extends BaseCalculatedFieldConfiguration implements CalculatedFieldConfiguration {
private boolean preserveMsgTs;
@Override
public CalculatedFieldType getType() {
return CalculatedFieldType.SIMPLE;

View File

@ -24,9 +24,12 @@ public class TbelCfCtx implements TbelCfObject {
@Getter
private final Map<String, TbelCfArg> args;
@Getter
private final long msgTs;
public TbelCfCtx(Map<String, TbelCfArg> args) {
public TbelCfCtx(Map<String, TbelCfArg> args, long lastUpdateTs) {
this.args = Collections.unmodifiableMap(args);
this.msgTs = lastUpdateTs != -1 ? lastUpdateTs : System.currentTimeMillis();
}
@Override

View File

@ -1,7 +1,7 @@
## Calculated Field TBEL Script Function
The **calculate()** function is a user-defined script that enables custom calculations using [TBEL](${siteBaseUrl}/docs${docPlatformPrefix}/user-guide/tbel/) on telemetry and attribute data.
It receives arguments configured in the calculated field setup, along with an additional `ctx` object that provides access to all arguments.
It receives arguments configured in the calculated field setup, along with an additional `ctx` object that stores `msgTs` and provides access to all arguments.
### Function Signature
@ -44,7 +44,7 @@ Let's modify the function that converts Fahrenheit to Celsius to also return the
var temperatureC = (temperatureF - 32) / 1.8;
return {
"ts": ctx.args.temperatureF.ts,
"values": { "temperatureC": toFixed(temperatureC, 2) }
"values": {"temperatureC": toFixed(temperatureC, 2)}
};
```
@ -88,7 +88,7 @@ foreach(t: temperature) {
}
// iterate through all values and calculate the sum using for loop:
sum = 0.0;
for(var i = 0; i < temperature.values.size; i++) {
for (var i = 0; i < temperature.values.size; i++) {
sum += temperature.values[i].value;
}
// use built-in function to calculate the sum
@ -146,12 +146,13 @@ function calculate(ctx, altitude, temperature) {
Time series rolling arguments can be **merged** to align timestamps across multiple datasets.
| Method | Description | Returns | Example |
|:-----------------------------|:--------------------------------------------------------------------------------------------------------------------------|:----------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `merge(other, settings)` | Merges with another rolling argument. Aligns timestamps and filling missing values with the previous available value. | Merged object with `timeWindow` and aligned values. | <span tb-help-popup="calculated-field/examples/merge-functions/merge_input" tb-help-popup-placement="top" trigger-text="Input"></span> <br> <span tb-help-popup="calculated-field/examples/merge-functions/merge_usage" tb-help-popup-placement="top" trigger-text="Usage"></span> <br> <span tb-help-popup="calculated-field/examples/merge-functions/merge_output" tb-help-popup-placement="top" trigger-text="Output"></span> |
| `mergeAll(others, settings)` | Merges multiple rolling arguments. Aligns timestamps and filling missing values with the previous available value. | Merged object with `timeWindow` and aligned values. | <span tb-help-popup="calculated-field/examples/merge-functions/merge_input" tb-help-popup-placement="top" trigger-text="Input"></span> <br> <span tb-help-popup="calculated-field/examples/merge-functions/merge_all_usage" tb-help-popup-placement="top" trigger-text="Usage"></span> <br> <span tb-help-popup="calculated-field/examples/merge-functions/merge_all_output" tb-help-popup-placement="top" trigger-text="Output"></span> |
| Method | Description | Returns | Example |
|:-----------------------------|:----------------------------------------------------------------------------------------------------------------------|:----------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `merge(other, settings)` | Merges with another rolling argument. Aligns timestamps and filling missing values with the previous available value. | Merged object with `timeWindow` and aligned values. | <span tb-help-popup="calculated-field/examples/merge-functions/merge_input" tb-help-popup-placement="top" trigger-text="Input"></span> <br> <span tb-help-popup="calculated-field/examples/merge-functions/merge_usage" tb-help-popup-placement="top" trigger-text="Usage"></span> <br> <span tb-help-popup="calculated-field/examples/merge-functions/merge_output" tb-help-popup-placement="top" trigger-text="Output"></span> |
| `mergeAll(others, settings)` | Merges multiple rolling arguments. Aligns timestamps and filling missing values with the previous available value. | Merged object with `timeWindow` and aligned values. | <span tb-help-popup="calculated-field/examples/merge-functions/merge_input" tb-help-popup-placement="top" trigger-text="Input"></span> <br> <span tb-help-popup="calculated-field/examples/merge-functions/merge_all_usage" tb-help-popup-placement="top" trigger-text="Usage"></span> <br> <span tb-help-popup="calculated-field/examples/merge-functions/merge_all_output" tb-help-popup-placement="top" trigger-text="Output"></span> |
##### Parameters
| Parameter | Description |
|:---------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `other` or `others` | Another rolling argument or array of rolling arguments to merge with. |
@ -187,29 +188,51 @@ function calculate(ctx, temperature, defrost) {
The result is a list of issues that may be used to configure alarm rules:
```json
[{
[
{
"ts": 1741613833843,
"values": {
"issue": {
"temperature": -3.12,
"defrostState": false
}
"issue": {
"temperature": -3.12,
"defrostState": false
}
}
}, {
},
{
"ts": 1741613923848,
"values": {
"issue": {
"temperature": -4.16,
"defrostState": false
}
"issue": {
"temperature": -4.16,
"defrostState": false
}
}
}]
}
]
```
### Function return format
The return format depends on the output type configured in the calculated field settings (default: **Time Series**).
### Message timestamp
The `ctx` object also includes property `msgTs`, which represents the timestamp of the incoming telemetry message that triggered the calculated field execution in milliseconds.
You can use `ctx.msgTs` to set the timestamp of the resulting output explicitly when returning a time series object.
```javascript
var temperatureC = (temperatureF - 32) / 1.8;
return {
ts: ctx.msgTs,
values: {
"temperatureC": toFixed(temperatureC, 2)
}
}
```
This ensures that the calculated data point aligns with the timestamp of the triggering telemetry.
##### Time Series Output
The function must return a JSON object or array with or without a timestamp.
@ -246,7 +269,7 @@ With timestamp:
"someArray": [1,2,3],
"someNestedObject": {"key": "value"}
}
}
}
}
```
@ -265,7 +288,7 @@ Array containing multiple timestamps and different values of the `airDensity` :
"values": {
"airDensity": 1.07
}
}
}
]
```