diff --git a/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java b/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java
index e4fb0318b4..e0a0260f7f 100644
--- a/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java
+++ b/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java
@@ -74,7 +74,7 @@ public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest
@Before
public void beforeTest() throws Exception {
configuration = new TbMsgTimeseriesNodeConfiguration();
- configuration.setIgnoreMetadataTs(true);
+ configuration.setUseServerTs(true);
loginSysAdmin();
@@ -142,7 +142,7 @@ public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest
TbMsgDataType.JSON,
getTbMsgData(msgValue.get(idx)));
saveDeviceTsEntry(device.getId(), tbMsg, msgValue.get(idx));
- saveAssetTsEntry(asset, device.getName(), msgValue.get(idx), TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isIgnoreMetadataTs()));
+ saveAssetTsEntry(asset, device.getName(), msgValue.get(idx), TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isUseServerTs()));
idx++;
}
}
@@ -172,7 +172,7 @@ public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest
}
void saveDeviceTsEntry(EntityId entityId, TbMsg tbMsg, long value) throws ExecutionException, InterruptedException, TimeoutException {
- TsKvEntry tsKvEntry = new BasicTsKvEntry(TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isIgnoreMetadataTs()), new LongDataEntry(TOTALIZER, value));
+ TsKvEntry tsKvEntry = new BasicTsKvEntry(TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isUseServerTs()), new LongDataEntry(TOTALIZER, value));
saveTimeseries(entityId, tsKvEntry);
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
index 2334851a08..2c426dfdec 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
@@ -48,11 +48,14 @@ import java.util.concurrent.TimeUnit;
nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type. " +
"Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' message timestamp will be applied. " +
"Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.\n " +
- "Enable 'ignoreMetadataTs' param to ignore the timestamp that arrives from message metadata. " +
+ "
" +
+ "Enable 'useServerTs' param to use the timestamp of the message processing instead of the timestamp from the message. " +
"Useful for all sorts of sequential processing if you merge messages from multiple sources (devices, assets, etc).\n" +
- "For example, if you count number of messages from multiple devices into asset time-series value. " +
- "Typically, you fetch the previous value of the counter, increment it and then save the value. " +
- "If you use timestamp of the original message, the value may be ignored, since it has outdated timestamp comparing to the previous message.",
+ "
" +
+ "In the case of sequential processing, the platform guarantees that the messages are processed in the order of their submission to the queue. " +
+ "However, the timestamp of the messages originated by multiple devices/servers may be unsynchronized long before they are pushed to the queue. " +
+ "The DB layer has certain optimizations to ignore the updates of the \"attributes\" and \"latest values\" tables if the new record has a timestamp that is older than the previous record. " +
+ "So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeTimeseriesConfig",
icon = "file_upload"
@@ -82,7 +85,7 @@ public class TbMsgTimeseriesNode implements TbNode {
ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
return;
}
- long ts = computeTs(msg, config.isIgnoreMetadataTs());
+ long ts = computeTs(msg, config.isUseServerTs());
String src = msg.getData();
Map> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
if (tsKvMap.isEmpty()) {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java
index 6c9f7ac884..0d59c325fa 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java
@@ -23,14 +23,14 @@ public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration