diff --git a/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java b/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java index 3257b31ca4..2dcb32cf39 100644 --- a/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java +++ b/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java @@ -244,7 +244,7 @@ public class CalculatedFieldController extends BaseController { ); Object[] args = new Object[ctxAndArgNames.size()]; - args[0] = new TbelCfCtx(arguments, getLastUpdateTimestamp(arguments)); + args[0] = new TbelCfCtx(arguments, getLatestTimestamp(arguments)); for (int i = 1; i < ctxAndArgNames.size(); i++) { var arg = arguments.get(ctxAndArgNames.get(i)); if (arg instanceof TbelCfSingleValueArg svArg) { @@ -267,7 +267,7 @@ public class CalculatedFieldController extends BaseController { return result; } - private long getLastUpdateTimestamp(Map arguments) { + private long getLatestTimestamp(Map arguments) { long lastUpdateTimestamp = -1; for (TbelCfArg entry : arguments.values()) { if (entry instanceof TbelCfSingleValueArg singleValueArg) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index 8289e4db42..81f19e1d1a 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -176,7 +176,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS for (int i = 0; i < entries.size(); i++) { TsKvProto.Builder tsProtoBuilder = toTsKvProto(entries.get(i)).toBuilder(); - if (result != null) { + if (versions != null && !versions.isEmpty() && versions.get(i) != null) { tsProtoBuilder.setVersion(versions.get(i)); } telemetryMsg.addTsData(tsProtoBuilder.build()); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index e4b03b4cab..e21d56b6d2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -35,7 +35,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { protected Map arguments; protected boolean sizeExceedsLimit; - protected long lastUpdateTimestamp = -1; + protected long latestTimestamp = -1; public BaseCalculatedFieldState(List requiredArguments) { this.requiredArguments = requiredArguments; @@ -110,12 +110,14 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { protected abstract void validateNewEntry(ArgumentEntry newEntry); private void updateLastUpdateTimestamp(ArgumentEntry entry) { + long newTs = this.latestTimestamp; if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { - this.lastUpdateTimestamp = singleValueArgumentEntry.getTs(); + newTs = singleValueArgumentEntry.getTs(); } else if (entry instanceof TsRollingArgumentEntry tsRollingArgumentEntry) { Map.Entry lastEntry = tsRollingArgumentEntry.getTsRecords().lastEntry(); - this.lastUpdateTimestamp = (lastEntry != null) ? lastEntry.getKey() : System.currentTimeMillis(); + newTs = (lastEntry != null) ? lastEntry.getKey() : System.currentTimeMillis(); } + this.latestTimestamp = Math.max(this.latestTimestamp, newTs); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java index 6eac3358ba..0de354bbb0 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java @@ -42,7 +42,7 @@ public interface CalculatedFieldState { Map getArguments(); - long getLastUpdateTimestamp(); + long getLatestTimestamp(); void setRequiredArguments(List requiredArguments); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java index 65ef40330c..84dce627ae 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java @@ -66,7 +66,7 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState { args.add(arg); } } - args.set(0, new TbelCfCtx(arguments, getLastUpdateTimestamp())); + args.set(0, new TbelCfCtx(arguments, getLatestTimestamp())); ListenableFuture resultFuture = ctx.getCalculatedFieldScriptEngine().executeJsonAsync(args.toArray()); Output output = ctx.getOutput(); return Futures.transform(resultFuture, diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index d0eba5031c..c8cdc7b4c0 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -87,10 +87,10 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { ObjectNode valuesNode = JacksonUtil.newObjectNode(); valuesNode.set(outputName, JacksonUtil.valueToTree(result)); - long lastTimestamp = getLastUpdateTimestamp(); - if (preserveMsgTs && lastTimestamp != -1) { + long latestTs = getLatestTimestamp(); + if (preserveMsgTs && latestTs != -1) { ObjectNode resultNode = JacksonUtil.newObjectNode(); - resultNode.put("ts", lastTimestamp); + resultNode.put("ts", latestTs); resultNode.set("values", valuesNode); return resultNode; } else { diff --git a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java index f65f6bc629..9742c7f618 100644 --- a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java @@ -505,6 +505,68 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes }); } + @Test + public void testSimpleCalculatedFieldWhenPreserveMsgTsIsTrueAndTelemetryBeforeLatest() throws Exception { + Device testDevice = createDevice("Test device", "1234567890"); + long ts = System.currentTimeMillis(); + + long tsA = ts - 300000L; + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"a\":1}}", tsA))); + + long tsB = ts - 300L; + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"b\":5}}", tsB))); + + CalculatedField calculatedField = new CalculatedField(); + calculatedField.setEntityId(testDevice.getId()); + calculatedField.setType(CalculatedFieldType.SIMPLE); + calculatedField.setName("a + b"); + calculatedField.setDebugSettings(DebugSettings.all()); + calculatedField.setConfigurationVersion(1); + + SimpleCalculatedFieldConfiguration config = new SimpleCalculatedFieldConfiguration(); + + Argument argument1 = new Argument(); + ReferencedEntityKey refEntityKey1 = new ReferencedEntityKey("a", ArgumentType.TS_LATEST, null); + argument1.setRefEntityKey(refEntityKey1); + Argument argument2 = new Argument(); + ReferencedEntityKey refEntityKey2 = new ReferencedEntityKey("b", ArgumentType.TS_LATEST, null); + argument2.setRefEntityKey(refEntityKey2); + config.setArguments(Map.of("a", argument1, "b", argument2)); + config.setExpression("a + b"); + + Output output = new Output(); + output.setName("c"); + output.setType(OutputType.TIME_SERIES); + config.setOutput(output); + + config.setPreserveMsgTs(true); + + calculatedField.setConfiguration(config); + + CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class); + + await().alias("create CF -> perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode c = getLatestTelemetry(testDevice.getId(), "c"); + assertThat(c).isNotNull(); + assertThat(c.get("c").get(0).get("ts").asText()).isEqualTo(Long.toString(tsB)); + assertThat(c.get("c").get(0).get("value").asText()).isEqualTo("6.0"); + }); + + long tsABeforeTsB = tsB - 300L; + doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(String.format("{\"ts\": %s, \"values\": {\"b\":10}}", tsABeforeTsB))); + + await().alias("update telemetry with ts less than latest -> save result with latest ts").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) + .untilAsserted(() -> { + ObjectNode c = getLatestTelemetry(testDevice.getId(), "c"); + assertThat(c).isNotNull(); + assertThat(c.get("c").get(0).get("ts").asText()).isEqualTo(Long.toString(tsB));// also tsB, since this is the latest timestamp + assertThat(c.get("c").get(0).get("value").asText()).isEqualTo("11.0"); + }); + } + @Test public void testScriptCalculatedFieldWhenUsedMsgTsInScript() throws Exception { Device testDevice = createDevice("Test device", "1234567890"); @@ -524,7 +586,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes ReferencedEntityKey refEntityKey = new ReferencedEntityKey("temperature", ArgumentType.TS_LATEST, null); argument.setRefEntityKey(refEntityKey); config.setArguments(Map.of("T", argument)); - config.setExpression("return {\"ts\": ctx.msgTs, \"values\": {\"fahrenheitTemp\": (T * 1.8) + 32}};"); + config.setExpression("return {\"ts\": ctx.latestTs, \"values\": {\"fahrenheitTemp\": (T * 1.8) + 32}};"); Output output = new Output(); output.setType(OutputType.TIME_SERIES); diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfCtx.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfCtx.java index 7515cb5269..c6023154ea 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfCtx.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfCtx.java @@ -25,11 +25,11 @@ public class TbelCfCtx implements TbelCfObject { @Getter private final Map args; @Getter - private final long msgTs; + private final long latestTs; - public TbelCfCtx(Map args, long lastUpdateTs) { + public TbelCfCtx(Map args, long latestTs) { this.args = Collections.unmodifiableMap(args); - this.msgTs = lastUpdateTs != -1 ? lastUpdateTs : System.currentTimeMillis(); + this.latestTs = latestTs != -1 ? latestTs : System.currentTimeMillis(); } @Override diff --git a/ui-ngx/src/app/modules/home/components/calculated-fields/components/dialog/calculated-field-dialog.component.scss b/ui-ngx/src/app/modules/home/components/calculated-fields/components/dialog/calculated-field-dialog.component.scss index efcd62efd4..e192e3ccc0 100644 --- a/ui-ngx/src/app/modules/home/components/calculated-fields/components/dialog/calculated-field-dialog.component.scss +++ b/ui-ngx/src/app/modules/home/components/calculated-fields/components/dialog/calculated-field-dialog.component.scss @@ -45,7 +45,7 @@ &-key { color: #c24c1a; } - &-time-window, &-values, &-func, &-value, &-ts, &-msgTs { + &-time-window, &-values, &-func, &-value, &-ts, &-latestTs { color: #7214D0; } &-start-ts, &-end-ts { diff --git a/ui-ngx/src/app/shared/models/calculated-field.models.ts b/ui-ngx/src/app/shared/models/calculated-field.models.ts index 6fade1d193..76b775b2fd 100644 --- a/ui-ngx/src/app/shared/models/calculated-field.models.ts +++ b/ui-ngx/src/app/shared/models/calculated-field.models.ts @@ -526,10 +526,10 @@ export const getCalculatedFieldArgumentsEditorCompleter = (argumentsObj: Record< description: 'Calculated field context arguments.', children: {} }, - msgTs: { + latestTs: { meta: 'constant', type: 'number', - description: 'Timestamp (ms) of the telemetry message that triggered the calculated field execution.' + description: 'Latest timestamp (ms) of the arguments telemetry.' } } } @@ -582,8 +582,8 @@ const calculatedFieldArgumentsContextValueHighlightRules: AceHighlightRules = { next: 'calculatedFieldCtxArgs' }, { - token: 'tb.calculated-field-msgTs', - regex: /msgTs/, + token: 'tb.calculated-field-latestTs', + regex: /latestTs/, next: 'no_regex' }, endGroupHighlightRule diff --git a/ui-ngx/src/assets/help/en_US/calculated-field/expression_fn.md b/ui-ngx/src/assets/help/en_US/calculated-field/expression_fn.md index 4c54b01499..1d45dea3c8 100644 --- a/ui-ngx/src/assets/help/en_US/calculated-field/expression_fn.md +++ b/ui-ngx/src/assets/help/en_US/calculated-field/expression_fn.md @@ -1,7 +1,7 @@ ## Calculated Field TBEL Script Function The **calculate()** function is a user-defined script that enables custom calculations using [TBEL](${siteBaseUrl}/docs${docPlatformPrefix}/user-guide/tbel/) on telemetry and attribute data. -It receives arguments configured in the calculated field setup, along with an additional `ctx` object that stores `msgTs` and provides access to all arguments. +It receives arguments configured in the calculated field setup, along with an additional `ctx` object that stores `latestTs` and provides access to all arguments. ### Function Signature @@ -216,14 +216,14 @@ The return format depends on the output type configured in the calculated field ### Message timestamp -The `ctx` object also includes property `msgTs`, which represents the timestamp of the incoming telemetry message that triggered the calculated field execution in milliseconds. +The `ctx` object also includes property `latestTs`, which represents the latest timestamp of the arguments telemetry in milliseconds. -You can use `ctx.msgTs` to set the timestamp of the resulting output explicitly when returning a time series object. +You can use `ctx.latestTs` to set the timestamp of the resulting output explicitly when returning a time series object. ```javascript var temperatureC = (temperatureF - 32) / 1.8; return { - ts: ctx.msgTs, + ts: ctx.latestTs, values: { "temperatureC": toFixed(temperatureC, 2) } diff --git a/ui-ngx/src/assets/locale/locale.constant-en_US.json b/ui-ngx/src/assets/locale/locale.constant-en_US.json index 5afc040daf..a0bb9c6052 100644 --- a/ui-ngx/src/assets/locale/locale.constant-en_US.json +++ b/ui-ngx/src/assets/locale/locale.constant-en_US.json @@ -1069,7 +1069,7 @@ "delete-multiple-title": "Are you sure you want to delete { count, plural, =1 {1 calculated field} other {# calculated fields} }?", "delete-multiple-text": "Be careful, after the confirmation all selected calculated fields will be removed and all related data will become unrecoverable.", "test-with-this-message": "Test with this message", - "use-message-timestamp": "Use message timestamp", + "use-message-timestamp": "Use latest timestamp", "hint": { "arguments-simple-with-rolling": "Simple type calculated field should not contain keys with time series rolling type.", "arguments-empty": "Arguments should not be empty.", @@ -1086,7 +1086,7 @@ "decimals-range": "Decimals by default should be a number between 0 and 15.", "expression": "Default expression demonstrates how to transform a temperature from Fahrenheit to Celsius.", "arguments-entity-not-found": "Argument target entity not found.", - "use-message-timestamp": "If enabled, the calculated value will be persisted using the timestamp of the telemetry that triggered the calculation, instead of the server time." + "use-message-timestamp": "If enabled, the calculated value will be persisted using the most recent timestamp from the arguments telemetry, instead of the server time." } }, "confirm-on-exit": {