diff --git a/application/src/test/java/org/thingsboard/server/service/ServiceSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/service/ServiceSqlTestSuite.java index 508272085c..d3ad80f614 100644 --- a/application/src/test/java/org/thingsboard/server/service/ServiceSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/service/ServiceSqlTestSuite.java @@ -23,6 +23,7 @@ import org.thingsboard.server.queue.memory.InMemoryStorage; @RunWith(ClasspathSuite.class) @ClasspathSuite.ClassnameFilters({ "org.thingsboard.server.service.resource.sql.*Test", + "org.thingsboard.server.service.sql.*Test" }) public class ServiceSqlTestSuite { diff --git a/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java b/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java new file mode 100644 index 0000000000..9e6b6f0bc1 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/sql/SequentialTimeseriesPersistenceTest.java @@ -0,0 +1,194 @@ +package org.thingsboard.server.service.sql; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode; +import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.security.Authority; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.session.SessionMsgType; +import org.thingsboard.server.controller.AbstractControllerTest; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.dao.timeseries.TimeseriesService; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@DaoSqlTest +@Slf4j +public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest { + + static final int TIMEOUT = 30; + + final String TOTALIZER = "Totalizer"; + final int TTL = 99999; + final String GENERIC_CUMULATIVE_OBJ = "genericCumulativeObj"; + final List ts = List.of(10L, 20L, 30L, 40L, 60L, 70L, 50L, 80L); + final List msgValue = List.of(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L); + + @Autowired + TimeseriesService timeseriesService; + + TbMsgTimeseriesNodeConfiguration configuration; + Tenant savedTenant; + User tenantAdmin; + + @Before + public void beforeTest() throws Exception { + configuration = new TbMsgTimeseriesNodeConfiguration(); + configuration.setIgnoreMetadataTs(true); + + loginSysAdmin(); + + Tenant tenant = new Tenant(); + tenant.setTitle("My tenant"); + savedTenant = doPost("/api/tenant", tenant, Tenant.class); + Assert.assertNotNull(savedTenant); + + tenantAdmin = new User(); + tenantAdmin.setAuthority(Authority.TENANT_ADMIN); + tenantAdmin.setTenantId(savedTenant.getId()); + tenantAdmin.setEmail("tenant2@thingsboard.org"); + tenantAdmin.setFirstName("Joe"); + tenantAdmin.setLastName("Downs"); + + tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1"); + } + + @After + public void afterTest() throws Exception { + loginSysAdmin(); + doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk()); + } + + @Test + public void testSequentialTimeseriesPersistence() throws Exception { + Asset asset = saveAsset("Asset"); + + Device deviceA = saveDevice("Device A"); + Device deviceB = saveDevice("Device B"); + Device deviceC = saveDevice("Device C"); + Device deviceD = saveDevice("Device D"); + List devices = List.of(deviceA, deviceB, deviceC, deviceD); + + for (int i = 0; i < 2; i++) { + int idx = i * (devices.size()); + saveLatestTsForAssetAndDevice(devices, asset, idx); + checkDiffBetweenLatestTsForDevicesAndAsset(devices, asset); + } + } + + Device saveDevice(String name) throws Exception { + Device device = new Device(); + device.setName(name); + device.setType("default"); + Device savedDevice = doPost("/api/device", device, Device.class); + Assert.assertNotNull(savedDevice); + return savedDevice; + } + + Asset saveAsset(String name) throws Exception { + Asset asset = new Asset(); + asset.setName(name); + asset.setType("default"); + Asset savedAsset = doPost("/api/asset", asset, Asset.class); + Assert.assertNotNull(savedAsset); + return savedAsset; + } + + private void saveLatestTsForAssetAndDevice(List devices, Asset asset, int idx) throws ExecutionException, InterruptedException, TimeoutException { + for (Device device : devices) { + TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), + device.getId(), + getTbMsgMetadata(device.getName(), ts.get(idx)), + TbMsgDataType.JSON, + getTbMsgData(msgValue.get(idx))); + saveDeviceTsEntry(device.getId(), tbMsg, msgValue.get(idx)); + saveAssetTsEntry(asset, device.getName(), msgValue.get(idx), TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isIgnoreMetadataTs())); + idx++; + } + } + + void checkDiffBetweenLatestTsForDevicesAndAsset(List devices, Asset asset) throws ExecutionException, InterruptedException, TimeoutException { + TsKvEntry assetTsKvEntry = getTsKvLatest(asset.getId(), GENERIC_CUMULATIVE_OBJ); + Assert.assertTrue(assetTsKvEntry.getJsonValue().isPresent()); + JsonObject assetJsonObject = new JsonParser().parse(assetTsKvEntry.getJsonValue().get()).getAsJsonObject(); + for (Device device : devices) { + Long assetValue = assetJsonObject.get(device.getName()).getAsLong(); + TsKvEntry deviceLatest = getTsKvLatest(device.getId(), TOTALIZER); + Assert.assertTrue(deviceLatest.getLongValue().isPresent()); + Long deviceValue = deviceLatest.getLongValue().get(); + Assert.assertEquals(assetValue, deviceValue); + } + } + + String getTbMsgData(long value) { + return "{\"Totalizer\": " + value + "}"; + } + + TbMsgMetaData getTbMsgMetadata(String name, long ts) { + Map metadata = new HashMap<>(); + metadata.put("deviceName", name); + metadata.put("ts", String.valueOf(ts)); + return new TbMsgMetaData(metadata); + } + + void saveDeviceTsEntry(EntityId entityId, TbMsg tbMsg, long value) throws ExecutionException, InterruptedException, TimeoutException { + TsKvEntry tsKvEntry = new BasicTsKvEntry(TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isIgnoreMetadataTs()), new LongDataEntry(TOTALIZER, value)); + saveTimeseries(entityId, tsKvEntry); + } + + void saveAssetTsEntry(Asset asset, String key, long value, long ts) throws ExecutionException, InterruptedException, TimeoutException { + Optional tsKvEntryOpt = getTsKvLatest(asset.getId(), GENERIC_CUMULATIVE_OBJ).getJsonValue(); + TsKvEntry saveTsKvEntry = new BasicTsKvEntry(ts, new JsonDataEntry(GENERIC_CUMULATIVE_OBJ, getJsonObject(key, value, tsKvEntryOpt).toString())); + saveTimeseries(asset.getId(), saveTsKvEntry); + } + + @NotNull + private JsonObject getJsonObject(String key, long value, Optional tsKvEntryOpt) { + JsonObject jsonObject = new JsonObject(); + if (tsKvEntryOpt.isPresent()) { + jsonObject = new JsonParser().parse(tsKvEntryOpt.get()).getAsJsonObject(); + } + jsonObject.addProperty(key, value); + return jsonObject; + } + + private void saveTimeseries(EntityId entityId, TsKvEntry saveTsKvEntry) throws InterruptedException, ExecutionException, TimeoutException { + timeseriesService.save(savedTenant.getId(), entityId, List.of(saveTsKvEntry), TTL).get(TIMEOUT, TimeUnit.SECONDS); + } + + TsKvEntry getTsKvLatest(EntityId entityId, String key) throws InterruptedException, ExecutionException, TimeoutException { + List tsKvEntries = timeseriesService.findLatest( + savedTenant.getTenantId(), + entityId, + List.of(key)).get(TIMEOUT, TimeUnit.SECONDS); + Assert.assertEquals(1, tsKvEntries.size()); + return tsKvEntries.get(0); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 711b7c1558..47cf0f1bf2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -77,7 +77,7 @@ public class TbMsgTimeseriesNode implements TbNode { ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); return; } - long ts = config.isSaveWithMsgTs() ? msg.getTs() : getTs(msg); + long ts = computeTs(msg, config.isIgnoreMetadataTs()); String src = msg.getData(); Map> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts); if (tsKvMap.isEmpty()) { @@ -102,6 +102,10 @@ public class TbMsgTimeseriesNode implements TbNode { } } + public static long computeTs(TbMsg msg, boolean saveWithMsgTs) { + return saveWithMsgTs ? System.currentTimeMillis() : getTs(msg); + } + public static long getTs(TbMsg msg) { long ts = -1; String tsStr = msg.getMetaData().getValue("ts"); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java index 89abac99fe..6c9f7ac884 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java @@ -23,14 +23,14 @@ public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration