diff --git a/application/src/test/java/org/thingsboard/server/service/ServiceSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/service/ServiceSqlTestSuite.java index 508272085c..d3ad80f614 100644 --- a/application/src/test/java/org/thingsboard/server/service/ServiceSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/service/ServiceSqlTestSuite.java @@ -23,6 +23,7 @@ import org.thingsboard.server.queue.memory.InMemoryStorage; @RunWith(ClasspathSuite.class) @ClasspathSuite.ClassnameFilters({ "org.thingsboard.server.service.resource.sql.*Test", + "org.thingsboard.server.service.sql.*Test" }) public class ServiceSqlTestSuite { diff --git a/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java b/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java new file mode 100644 index 0000000000..e0a0260f7f --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java @@ -0,0 +1,207 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.sql; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.jetbrains.annotations.NotNull; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode; +import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.security.Authority; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.session.SessionMsgType; +import org.thingsboard.server.controller.AbstractControllerTest; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.dao.timeseries.TimeseriesService; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@DaoSqlTest +public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest { + + static final int TIMEOUT = 30; + + final String TOTALIZER = "Totalizer"; + final int TTL = 99999; + final String GENERIC_CUMULATIVE_OBJ = "genericCumulativeObj"; + final List ts = List.of(10L, 20L, 30L, 40L, 60L, 70L, 50L, 80L); + final List msgValue = List.of(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L); + + @Autowired + TimeseriesService timeseriesService; + + TbMsgTimeseriesNodeConfiguration configuration; + Tenant savedTenant; + User tenantAdmin; + + @Before + public void beforeTest() throws Exception { + configuration = new TbMsgTimeseriesNodeConfiguration(); + configuration.setUseServerTs(true); + + loginSysAdmin(); + + Tenant tenant = new Tenant(); + tenant.setTitle("My tenant"); + savedTenant = doPost("/api/tenant", tenant, Tenant.class); + Assert.assertNotNull(savedTenant); + + tenantAdmin = new User(); + tenantAdmin.setAuthority(Authority.TENANT_ADMIN); + tenantAdmin.setTenantId(savedTenant.getId()); + tenantAdmin.setEmail("tenant2@thingsboard.org"); + tenantAdmin.setFirstName("Joe"); + tenantAdmin.setLastName("Downs"); + + tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1"); + } + + @After + public void afterTest() throws Exception { + loginSysAdmin(); + doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk()); + } + + @Test + public void testSequentialTimeseriesPersistence() throws Exception { + Asset asset = saveAsset("Asset"); + + Device deviceA = saveDevice("Device A"); + Device deviceB = saveDevice("Device B"); + Device deviceC = saveDevice("Device C"); + Device deviceD = saveDevice("Device D"); + List devices = List.of(deviceA, deviceB, deviceC, deviceD); + + for (int i = 0; i < 2; i++) { + int idx = i * devices.size(); + saveLatestTsForAssetAndDevice(devices, asset, idx); + checkDiffBetweenLatestTsForDevicesAndAsset(devices, asset); + } + } + + Device saveDevice(String name) throws Exception { + Device device = new Device(); + device.setName(name); + device.setType("default"); + Device savedDevice = doPost("/api/device", device, Device.class); + Assert.assertNotNull(savedDevice); + return savedDevice; + } + + Asset saveAsset(String name) throws Exception { + Asset asset = new Asset(); + asset.setName(name); + asset.setType("default"); + Asset savedAsset = doPost("/api/asset", asset, Asset.class); + Assert.assertNotNull(savedAsset); + return savedAsset; + } + + void saveLatestTsForAssetAndDevice(List devices, Asset asset, int idx) throws ExecutionException, InterruptedException, TimeoutException { + for (Device device : devices) { + TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), + device.getId(), + getTbMsgMetadata(device.getName(), ts.get(idx)), + TbMsgDataType.JSON, + getTbMsgData(msgValue.get(idx))); + saveDeviceTsEntry(device.getId(), tbMsg, msgValue.get(idx)); + saveAssetTsEntry(asset, device.getName(), msgValue.get(idx), TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isUseServerTs())); + idx++; + } + } + + void checkDiffBetweenLatestTsForDevicesAndAsset(List devices, Asset asset) throws ExecutionException, InterruptedException, TimeoutException { + TsKvEntry assetTsKvEntry = getTsKvLatest(asset.getId(), GENERIC_CUMULATIVE_OBJ); + Assert.assertTrue(assetTsKvEntry.getJsonValue().isPresent()); + JsonObject assetJsonObject = new JsonParser().parse(assetTsKvEntry.getJsonValue().get()).getAsJsonObject(); + for (Device device : devices) { + Long assetValue = assetJsonObject.get(device.getName()).getAsLong(); + TsKvEntry deviceLatest = getTsKvLatest(device.getId(), TOTALIZER); + Assert.assertTrue(deviceLatest.getLongValue().isPresent()); + Long deviceValue = deviceLatest.getLongValue().get(); + Assert.assertEquals(assetValue, deviceValue); + } + } + + String getTbMsgData(long value) { + return "{\"Totalizer\": " + value + "}"; + } + + TbMsgMetaData getTbMsgMetadata(String name, long ts) { + Map metadata = new HashMap<>(); + metadata.put("deviceName", name); + metadata.put("ts", String.valueOf(ts)); + return new TbMsgMetaData(metadata); + } + + void saveDeviceTsEntry(EntityId entityId, TbMsg tbMsg, long value) throws ExecutionException, InterruptedException, TimeoutException { + TsKvEntry tsKvEntry = new BasicTsKvEntry(TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isUseServerTs()), new LongDataEntry(TOTALIZER, value)); + saveTimeseries(entityId, tsKvEntry); + } + + void saveAssetTsEntry(Asset asset, String key, long value, long ts) throws ExecutionException, InterruptedException, TimeoutException { + Optional tsKvEntryOpt = getTsKvLatest(asset.getId(), GENERIC_CUMULATIVE_OBJ).getJsonValue(); + TsKvEntry saveTsKvEntry = new BasicTsKvEntry(ts, new JsonDataEntry(GENERIC_CUMULATIVE_OBJ, getJsonObject(key, value, tsKvEntryOpt).toString())); + saveTimeseries(asset.getId(), saveTsKvEntry); + } + + @NotNull + JsonObject getJsonObject(String key, long value, Optional tsKvEntryOpt) { + JsonObject jsonObject = new JsonObject(); + if (tsKvEntryOpt.isPresent()) { + jsonObject = new JsonParser().parse(tsKvEntryOpt.get()).getAsJsonObject(); + } + jsonObject.addProperty(key, value); + return jsonObject; + } + + void saveTimeseries(EntityId entityId, TsKvEntry saveTsKvEntry) throws InterruptedException, ExecutionException, TimeoutException { + timeseriesService.save(savedTenant.getId(), entityId, List.of(saveTsKvEntry), TTL).get(TIMEOUT, TimeUnit.SECONDS); + } + + TsKvEntry getTsKvLatest(EntityId entityId, String key) throws InterruptedException, ExecutionException, TimeoutException { + List tsKvEntries = timeseriesService.findLatest( + savedTenant.getTenantId(), + entityId, + List.of(key)).get(TIMEOUT, TimeUnit.SECONDS); + Assert.assertEquals(1, tsKvEntries.size()); + return tsKvEntries.get(0); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 1d52b7cdfa..2c426dfdec 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -46,8 +46,16 @@ import java.util.concurrent.TimeUnit; configClazz = TbMsgTimeseriesNodeConfiguration.class, nodeDescription = "Saves timeseries data", nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type. " + - "Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' timestamp will be applied. " + - "Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.", + "Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' message timestamp will be applied. " + + "Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.\n " + + "
" + + "Enable 'useServerTs' param to use the timestamp of the message processing instead of the timestamp from the message. " + + "Useful for all sorts of sequential processing if you merge messages from multiple sources (devices, assets, etc).\n" + + "
" + + "In the case of sequential processing, the platform guarantees that the messages are processed in the order of their submission to the queue. " + + "However, the timestamp of the messages originated by multiple devices/servers may be unsynchronized long before they are pushed to the queue. " + + "The DB layer has certain optimizations to ignore the updates of the \"attributes\" and \"latest values\" tables if the new record has a timestamp that is older than the previous record. " + + "So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeTimeseriesConfig", icon = "file_upload" @@ -77,7 +85,7 @@ public class TbMsgTimeseriesNode implements TbNode { ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); return; } - long ts = getTs(msg); + long ts = computeTs(msg, config.isUseServerTs()); String src = msg.getData(); Map> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts); if (tsKvMap.isEmpty()) { @@ -102,6 +110,10 @@ public class TbMsgTimeseriesNode implements TbNode { } } + public static long computeTs(TbMsg msg, boolean ignoreMetadataTs) { + return ignoreMetadataTs ? System.currentTimeMillis() : getTs(msg); + } + public static long getTs(TbMsg msg) { long ts = -1; String tsStr = msg.getMetaData().getValue("ts"); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java index bb661df905..0d59c325fa 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java @@ -23,12 +23,14 @@ public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration