From 659d8abf35fa6f41024f70f299b88e96ce96e302 Mon Sep 17 00:00:00 2001 From: van-vanich Date: Wed, 5 Jan 2022 18:15:16 +0200 Subject: [PATCH] improve node details and improve field naming --- .../sql/SequentialTimeseriesPersistenceTest.java | 6 +++--- .../rule/engine/telemetry/TbMsgTimeseriesNode.java | 13 ++++++++----- .../telemetry/TbMsgTimeseriesNodeConfiguration.java | 4 ++-- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java b/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java index e4fb0318b4..e0a0260f7f 100644 --- a/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java @@ -74,7 +74,7 @@ public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest @Before public void beforeTest() throws Exception { configuration = new TbMsgTimeseriesNodeConfiguration(); - configuration.setIgnoreMetadataTs(true); + configuration.setUseServerTs(true); loginSysAdmin(); @@ -142,7 +142,7 @@ public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest TbMsgDataType.JSON, getTbMsgData(msgValue.get(idx))); saveDeviceTsEntry(device.getId(), tbMsg, msgValue.get(idx)); - saveAssetTsEntry(asset, device.getName(), msgValue.get(idx), TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isIgnoreMetadataTs())); + saveAssetTsEntry(asset, device.getName(), msgValue.get(idx), TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isUseServerTs())); idx++; } } @@ -172,7 +172,7 @@ public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest } void saveDeviceTsEntry(EntityId entityId, TbMsg tbMsg, long value) throws ExecutionException, InterruptedException, TimeoutException { - TsKvEntry tsKvEntry = new BasicTsKvEntry(TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isIgnoreMetadataTs()), new LongDataEntry(TOTALIZER, value)); + TsKvEntry tsKvEntry = new BasicTsKvEntry(TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isUseServerTs()), new LongDataEntry(TOTALIZER, value)); saveTimeseries(entityId, tsKvEntry); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 2334851a08..2c426dfdec 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -48,11 +48,14 @@ import java.util.concurrent.TimeUnit; nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type. " + "Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' message timestamp will be applied. " + "Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.\n " + - "Enable 'ignoreMetadataTs' param to ignore the timestamp that arrives from message metadata. " + + "
" + + "Enable 'useServerTs' param to use the timestamp of the message processing instead of the timestamp from the message. " + "Useful for all sorts of sequential processing if you merge messages from multiple sources (devices, assets, etc).\n" + - "For example, if you count number of messages from multiple devices into asset time-series value. " + - "Typically, you fetch the previous value of the counter, increment it and then save the value. " + - "If you use timestamp of the original message, the value may be ignored, since it has outdated timestamp comparing to the previous message.", + "
" + + "In the case of sequential processing, the platform guarantees that the messages are processed in the order of their submission to the queue. " + + "However, the timestamp of the messages originated by multiple devices/servers may be unsynchronized long before they are pushed to the queue. " + + "The DB layer has certain optimizations to ignore the updates of the \"attributes\" and \"latest values\" tables if the new record has a timestamp that is older than the previous record. " + + "So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeTimeseriesConfig", icon = "file_upload" @@ -82,7 +85,7 @@ public class TbMsgTimeseriesNode implements TbNode { ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); return; } - long ts = computeTs(msg, config.isIgnoreMetadataTs()); + long ts = computeTs(msg, config.isUseServerTs()); String src = msg.getData(); Map> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts); if (tsKvMap.isEmpty()) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java index 6c9f7ac884..0d59c325fa 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java @@ -23,14 +23,14 @@ public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration