changed msgTs to latestTs

This commit is contained in:
IrynaMatveieva 2025-06-10 09:24:49 +03:00
parent 7a169178bc
commit da90d8f727
12 changed files with 90 additions and 26 deletions

View File

@ -244,7 +244,7 @@ public class CalculatedFieldController extends BaseController {
); );
Object[] args = new Object[ctxAndArgNames.size()]; Object[] args = new Object[ctxAndArgNames.size()];
args[0] = new TbelCfCtx(arguments, getLastUpdateTimestamp(arguments)); args[0] = new TbelCfCtx(arguments, getLatestTimestamp(arguments));
for (int i = 1; i < ctxAndArgNames.size(); i++) { for (int i = 1; i < ctxAndArgNames.size(); i++) {
var arg = arguments.get(ctxAndArgNames.get(i)); var arg = arguments.get(ctxAndArgNames.get(i));
if (arg instanceof TbelCfSingleValueArg svArg) { if (arg instanceof TbelCfSingleValueArg svArg) {
@ -267,7 +267,7 @@ public class CalculatedFieldController extends BaseController {
return result; return result;
} }
private long getLastUpdateTimestamp(Map<String, TbelCfArg> arguments) { private long getLatestTimestamp(Map<String, TbelCfArg> arguments) {
long lastUpdateTimestamp = -1; long lastUpdateTimestamp = -1;
for (TbelCfArg entry : arguments.values()) { for (TbelCfArg entry : arguments.values()) {
if (entry instanceof TbelCfSingleValueArg singleValueArg) { if (entry instanceof TbelCfSingleValueArg singleValueArg) {

View File

@ -176,7 +176,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
for (int i = 0; i < entries.size(); i++) { for (int i = 0; i < entries.size(); i++) {
TsKvProto.Builder tsProtoBuilder = toTsKvProto(entries.get(i)).toBuilder(); TsKvProto.Builder tsProtoBuilder = toTsKvProto(entries.get(i)).toBuilder();
if (result != null) { if (versions != null && !versions.isEmpty() && versions.get(i) != null) {
tsProtoBuilder.setVersion(versions.get(i)); tsProtoBuilder.setVersion(versions.get(i));
} }
telemetryMsg.addTsData(tsProtoBuilder.build()); telemetryMsg.addTsData(tsProtoBuilder.build());

View File

@ -35,7 +35,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
protected Map<String, ArgumentEntry> arguments; protected Map<String, ArgumentEntry> arguments;
protected boolean sizeExceedsLimit; protected boolean sizeExceedsLimit;
protected long lastUpdateTimestamp = -1; protected long latestTimestamp = -1;
public BaseCalculatedFieldState(List<String> requiredArguments) { public BaseCalculatedFieldState(List<String> requiredArguments) {
this.requiredArguments = requiredArguments; this.requiredArguments = requiredArguments;
@ -110,12 +110,14 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
protected abstract void validateNewEntry(ArgumentEntry newEntry); protected abstract void validateNewEntry(ArgumentEntry newEntry);
private void updateLastUpdateTimestamp(ArgumentEntry entry) { private void updateLastUpdateTimestamp(ArgumentEntry entry) {
long newTs = this.latestTimestamp;
if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
this.lastUpdateTimestamp = singleValueArgumentEntry.getTs(); newTs = singleValueArgumentEntry.getTs();
} else if (entry instanceof TsRollingArgumentEntry tsRollingArgumentEntry) { } else if (entry instanceof TsRollingArgumentEntry tsRollingArgumentEntry) {
Map.Entry<Long, Double> lastEntry = tsRollingArgumentEntry.getTsRecords().lastEntry(); Map.Entry<Long, Double> lastEntry = tsRollingArgumentEntry.getTsRecords().lastEntry();
this.lastUpdateTimestamp = (lastEntry != null) ? lastEntry.getKey() : System.currentTimeMillis(); newTs = (lastEntry != null) ? lastEntry.getKey() : System.currentTimeMillis();
} }
this.latestTimestamp = Math.max(this.latestTimestamp, newTs);
} }
} }

View File

@ -42,7 +42,7 @@ public interface CalculatedFieldState {
Map<String, ArgumentEntry> getArguments(); Map<String, ArgumentEntry> getArguments();
long getLastUpdateTimestamp(); long getLatestTimestamp();
void setRequiredArguments(List<String> requiredArguments); void setRequiredArguments(List<String> requiredArguments);

View File

@ -66,7 +66,7 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
args.add(arg); args.add(arg);
} }
} }
args.set(0, new TbelCfCtx(arguments, getLastUpdateTimestamp())); args.set(0, new TbelCfCtx(arguments, getLatestTimestamp()));
ListenableFuture<JsonNode> resultFuture = ctx.getCalculatedFieldScriptEngine().executeJsonAsync(args.toArray()); ListenableFuture<JsonNode> resultFuture = ctx.getCalculatedFieldScriptEngine().executeJsonAsync(args.toArray());
Output output = ctx.getOutput(); Output output = ctx.getOutput();
return Futures.transform(resultFuture, return Futures.transform(resultFuture,

View File

@ -87,10 +87,10 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
ObjectNode valuesNode = JacksonUtil.newObjectNode(); ObjectNode valuesNode = JacksonUtil.newObjectNode();
valuesNode.set(outputName, JacksonUtil.valueToTree(result)); valuesNode.set(outputName, JacksonUtil.valueToTree(result));
long lastTimestamp = getLastUpdateTimestamp(); long latestTs = getLatestTimestamp();
if (preserveMsgTs && lastTimestamp != -1) { if (preserveMsgTs && latestTs != -1) {
ObjectNode resultNode = JacksonUtil.newObjectNode(); ObjectNode resultNode = JacksonUtil.newObjectNode();
resultNode.put("ts", lastTimestamp); resultNode.put("ts", latestTs);
resultNode.set("values", valuesNode); resultNode.set("values", valuesNode);
return resultNode; return resultNode;
} else { } else {

View File

@ -505,6 +505,68 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
}); });
} }
@Test
public void testSimpleCalculatedFieldWhenPreserveMsgTsIsTrueAndTelemetryBeforeLatest() throws Exception {
Device testDevice = createDevice("Test device", "1234567890");
long ts = System.currentTimeMillis();
long tsA = ts - 300000L;
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"a\":1}}", tsA)));
long tsB = ts - 300L;
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"b\":5}}", tsB)));
CalculatedField calculatedField = new CalculatedField();
calculatedField.setEntityId(testDevice.getId());
calculatedField.setType(CalculatedFieldType.SIMPLE);
calculatedField.setName("a + b");
calculatedField.setDebugSettings(DebugSettings.all());
calculatedField.setConfigurationVersion(1);
SimpleCalculatedFieldConfiguration config = new SimpleCalculatedFieldConfiguration();
Argument argument1 = new Argument();
ReferencedEntityKey refEntityKey1 = new ReferencedEntityKey("a", ArgumentType.TS_LATEST, null);
argument1.setRefEntityKey(refEntityKey1);
Argument argument2 = new Argument();
ReferencedEntityKey refEntityKey2 = new ReferencedEntityKey("b", ArgumentType.TS_LATEST, null);
argument2.setRefEntityKey(refEntityKey2);
config.setArguments(Map.of("a", argument1, "b", argument2));
config.setExpression("a + b");
Output output = new Output();
output.setName("c");
output.setType(OutputType.TIME_SERIES);
config.setOutput(output);
config.setPreserveMsgTs(true);
calculatedField.setConfiguration(config);
CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
await().alias("create CF -> perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode c = getLatestTelemetry(testDevice.getId(), "c");
assertThat(c).isNotNull();
assertThat(c.get("c").get(0).get("ts").asText()).isEqualTo(Long.toString(tsB));
assertThat(c.get("c").get(0).get("value").asText()).isEqualTo("6.0");
});
long tsABeforeTsB = tsB - 300L;
doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"b\":10}}", tsABeforeTsB)));
await().alias("update telemetry with ts less than latest -> save result with latest ts").atMost(TIMEOUT, TimeUnit.SECONDS)
.pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
.untilAsserted(() -> {
ObjectNode c = getLatestTelemetry(testDevice.getId(), "c");
assertThat(c).isNotNull();
assertThat(c.get("c").get(0).get("ts").asText()).isEqualTo(Long.toString(tsB));// also tsB, since this is the latest timestamp
assertThat(c.get("c").get(0).get("value").asText()).isEqualTo("11.0");
});
}
@Test @Test
public void testScriptCalculatedFieldWhenUsedMsgTsInScript() throws Exception { public void testScriptCalculatedFieldWhenUsedMsgTsInScript() throws Exception {
Device testDevice = createDevice("Test device", "1234567890"); Device testDevice = createDevice("Test device", "1234567890");
@ -524,7 +586,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
ReferencedEntityKey refEntityKey = new ReferencedEntityKey("temperature", ArgumentType.TS_LATEST, null); ReferencedEntityKey refEntityKey = new ReferencedEntityKey("temperature", ArgumentType.TS_LATEST, null);
argument.setRefEntityKey(refEntityKey); argument.setRefEntityKey(refEntityKey);
config.setArguments(Map.of("T", argument)); config.setArguments(Map.of("T", argument));
config.setExpression("return {\"ts\": ctx.msgTs, \"values\": {\"fahrenheitTemp\": (T * 1.8) + 32}};"); config.setExpression("return {\"ts\": ctx.latestTs, \"values\": {\"fahrenheitTemp\": (T * 1.8) + 32}};");
Output output = new Output(); Output output = new Output();
output.setType(OutputType.TIME_SERIES); output.setType(OutputType.TIME_SERIES);

View File

@ -25,11 +25,11 @@ public class TbelCfCtx implements TbelCfObject {
@Getter @Getter
private final Map<String, TbelCfArg> args; private final Map<String, TbelCfArg> args;
@Getter @Getter
private final long msgTs; private final long latestTs;
public TbelCfCtx(Map<String, TbelCfArg> args, long lastUpdateTs) { public TbelCfCtx(Map<String, TbelCfArg> args, long latestTs) {
this.args = Collections.unmodifiableMap(args); this.args = Collections.unmodifiableMap(args);
this.msgTs = lastUpdateTs != -1 ? lastUpdateTs : System.currentTimeMillis(); this.latestTs = latestTs != -1 ? latestTs : System.currentTimeMillis();
} }
@Override @Override

View File

@ -45,7 +45,7 @@
&-key { &-key {
color: #c24c1a; color: #c24c1a;
} }
&-time-window, &-values, &-func, &-value, &-ts, &-msgTs { &-time-window, &-values, &-func, &-value, &-ts, &-latestTs {
color: #7214D0; color: #7214D0;
} }
&-start-ts, &-end-ts { &-start-ts, &-end-ts {

View File

@ -526,10 +526,10 @@ export const getCalculatedFieldArgumentsEditorCompleter = (argumentsObj: Record<
description: 'Calculated field context arguments.', description: 'Calculated field context arguments.',
children: {} children: {}
}, },
msgTs: { latestTs: {
meta: 'constant', meta: 'constant',
type: 'number', type: 'number',
description: 'Timestamp (ms) of the telemetry message that triggered the calculated field execution.' description: 'Latest timestamp (ms) of the arguments telemetry.'
} }
} }
} }
@ -582,8 +582,8 @@ const calculatedFieldArgumentsContextValueHighlightRules: AceHighlightRules = {
next: 'calculatedFieldCtxArgs' next: 'calculatedFieldCtxArgs'
}, },
{ {
token: 'tb.calculated-field-msgTs', token: 'tb.calculated-field-latestTs',
regex: /msgTs/, regex: /latestTs/,
next: 'no_regex' next: 'no_regex'
}, },
endGroupHighlightRule endGroupHighlightRule

View File

@ -1,7 +1,7 @@
## Calculated Field TBEL Script Function ## Calculated Field TBEL Script Function
The **calculate()** function is a user-defined script that enables custom calculations using [TBEL](${siteBaseUrl}/docs${docPlatformPrefix}/user-guide/tbel/) on telemetry and attribute data. The **calculate()** function is a user-defined script that enables custom calculations using [TBEL](${siteBaseUrl}/docs${docPlatformPrefix}/user-guide/tbel/) on telemetry and attribute data.
It receives arguments configured in the calculated field setup, along with an additional `ctx` object that stores `msgTs` and provides access to all arguments. It receives arguments configured in the calculated field setup, along with an additional `ctx` object that stores `latestTs` and provides access to all arguments.
### Function Signature ### Function Signature
@ -216,14 +216,14 @@ The return format depends on the output type configured in the calculated field
### Message timestamp ### Message timestamp
The `ctx` object also includes property `msgTs`, which represents the timestamp of the incoming telemetry message that triggered the calculated field execution in milliseconds. The `ctx` object also includes property `latestTs`, which represents the latest timestamp of the arguments telemetry in milliseconds.
You can use `ctx.msgTs` to set the timestamp of the resulting output explicitly when returning a time series object. You can use `ctx.latestTs` to set the timestamp of the resulting output explicitly when returning a time series object.
```javascript ```javascript
var temperatureC = (temperatureF - 32) / 1.8; var temperatureC = (temperatureF - 32) / 1.8;
return { return {
ts: ctx.msgTs, ts: ctx.latestTs,
values: { values: {
"temperatureC": toFixed(temperatureC, 2) "temperatureC": toFixed(temperatureC, 2)
} }

View File

@ -1069,7 +1069,7 @@
"delete-multiple-title": "Are you sure you want to delete { count, plural, =1 {1 calculated field} other {# calculated fields} }?", "delete-multiple-title": "Are you sure you want to delete { count, plural, =1 {1 calculated field} other {# calculated fields} }?",
"delete-multiple-text": "Be careful, after the confirmation all selected calculated fields will be removed and all related data will become unrecoverable.", "delete-multiple-text": "Be careful, after the confirmation all selected calculated fields will be removed and all related data will become unrecoverable.",
"test-with-this-message": "Test with this message", "test-with-this-message": "Test with this message",
"use-message-timestamp": "Use message timestamp", "use-message-timestamp": "Use latest timestamp",
"hint": { "hint": {
"arguments-simple-with-rolling": "Simple type calculated field should not contain keys with time series rolling type.", "arguments-simple-with-rolling": "Simple type calculated field should not contain keys with time series rolling type.",
"arguments-empty": "Arguments should not be empty.", "arguments-empty": "Arguments should not be empty.",
@ -1086,7 +1086,7 @@
"decimals-range": "Decimals by default should be a number between 0 and 15.", "decimals-range": "Decimals by default should be a number between 0 and 15.",
"expression": "Default expression demonstrates how to transform a temperature from Fahrenheit to Celsius.", "expression": "Default expression demonstrates how to transform a temperature from Fahrenheit to Celsius.",
"arguments-entity-not-found": "Argument target entity not found.", "arguments-entity-not-found": "Argument target entity not found.",
"use-message-timestamp": "If enabled, the calculated value will be persisted using the timestamp of the telemetry that triggered the calculation, instead of the server time." "use-message-timestamp": "If enabled, the calculated value will be persisted using the most recent timestamp from the arguments telemetry, instead of the server time."
} }
}, },
"confirm-on-exit": { "confirm-on-exit": {