improve node details and improve field naming
This commit is contained in:
parent
93bf95c17a
commit
659d8abf35
@ -74,7 +74,7 @@ public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest
|
|||||||
@Before
|
@Before
|
||||||
public void beforeTest() throws Exception {
|
public void beforeTest() throws Exception {
|
||||||
configuration = new TbMsgTimeseriesNodeConfiguration();
|
configuration = new TbMsgTimeseriesNodeConfiguration();
|
||||||
configuration.setIgnoreMetadataTs(true);
|
configuration.setUseServerTs(true);
|
||||||
|
|
||||||
loginSysAdmin();
|
loginSysAdmin();
|
||||||
|
|
||||||
@ -142,7 +142,7 @@ public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest
|
|||||||
TbMsgDataType.JSON,
|
TbMsgDataType.JSON,
|
||||||
getTbMsgData(msgValue.get(idx)));
|
getTbMsgData(msgValue.get(idx)));
|
||||||
saveDeviceTsEntry(device.getId(), tbMsg, msgValue.get(idx));
|
saveDeviceTsEntry(device.getId(), tbMsg, msgValue.get(idx));
|
||||||
saveAssetTsEntry(asset, device.getName(), msgValue.get(idx), TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isIgnoreMetadataTs()));
|
saveAssetTsEntry(asset, device.getName(), msgValue.get(idx), TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isUseServerTs()));
|
||||||
idx++;
|
idx++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -172,7 +172,7 @@ public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest
|
|||||||
}
|
}
|
||||||
|
|
||||||
void saveDeviceTsEntry(EntityId entityId, TbMsg tbMsg, long value) throws ExecutionException, InterruptedException, TimeoutException {
|
void saveDeviceTsEntry(EntityId entityId, TbMsg tbMsg, long value) throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
TsKvEntry tsKvEntry = new BasicTsKvEntry(TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isIgnoreMetadataTs()), new LongDataEntry(TOTALIZER, value));
|
TsKvEntry tsKvEntry = new BasicTsKvEntry(TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isUseServerTs()), new LongDataEntry(TOTALIZER, value));
|
||||||
saveTimeseries(entityId, tsKvEntry);
|
saveTimeseries(entityId, tsKvEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -48,11 +48,14 @@ import java.util.concurrent.TimeUnit;
|
|||||||
nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type. " +
|
nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type. " +
|
||||||
"Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' message timestamp will be applied. " +
|
"Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' message timestamp will be applied. " +
|
||||||
"Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.\n " +
|
"Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.\n " +
|
||||||
"Enable 'ignoreMetadataTs' param to ignore the timestamp that arrives from message metadata. " +
|
"<br/>" +
|
||||||
|
"Enable 'useServerTs' param to use the timestamp of the message processing instead of the timestamp from the message. " +
|
||||||
"Useful for all sorts of sequential processing if you merge messages from multiple sources (devices, assets, etc).\n" +
|
"Useful for all sorts of sequential processing if you merge messages from multiple sources (devices, assets, etc).\n" +
|
||||||
"For example, if you count number of messages from multiple devices into asset time-series value. " +
|
"<br/>" +
|
||||||
"Typically, you fetch the previous value of the counter, increment it and then save the value. " +
|
"In the case of sequential processing, the platform guarantees that the messages are processed in the order of their submission to the queue. " +
|
||||||
"If you use timestamp of the original message, the value may be ignored, since it has outdated timestamp comparing to the previous message.",
|
"However, the timestamp of the messages originated by multiple devices/servers may be unsynchronized long before they are pushed to the queue. " +
|
||||||
|
"The DB layer has certain optimizations to ignore the updates of the \"attributes\" and \"latest values\" tables if the new record has a timestamp that is older than the previous record. " +
|
||||||
|
"So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.",
|
||||||
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
||||||
configDirective = "tbActionNodeTimeseriesConfig",
|
configDirective = "tbActionNodeTimeseriesConfig",
|
||||||
icon = "file_upload"
|
icon = "file_upload"
|
||||||
@ -82,7 +85,7 @@ public class TbMsgTimeseriesNode implements TbNode {
|
|||||||
ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
|
ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
long ts = computeTs(msg, config.isIgnoreMetadataTs());
|
long ts = computeTs(msg, config.isUseServerTs());
|
||||||
String src = msg.getData();
|
String src = msg.getData();
|
||||||
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
|
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
|
||||||
if (tsKvMap.isEmpty()) {
|
if (tsKvMap.isEmpty()) {
|
||||||
|
|||||||
@ -23,14 +23,14 @@ public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration<TbMsg
|
|||||||
|
|
||||||
private long defaultTTL;
|
private long defaultTTL;
|
||||||
private boolean skipLatestPersistence;
|
private boolean skipLatestPersistence;
|
||||||
private boolean ignoreMetadataTs;
|
private boolean useServerTs;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbMsgTimeseriesNodeConfiguration defaultConfiguration() {
|
public TbMsgTimeseriesNodeConfiguration defaultConfiguration() {
|
||||||
TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration();
|
TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration();
|
||||||
configuration.setDefaultTTL(0L);
|
configuration.setDefaultTTL(0L);
|
||||||
configuration.setSkipLatestPersistence(false);
|
configuration.setSkipLatestPersistence(false);
|
||||||
configuration.setIgnoreMetadataTs(false);
|
configuration.setUseServerTs(false);
|
||||||
return configuration;
|
return configuration;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user