add test for sequential timeseries persistence and improve code for TbMsgTimeseriesNode rule node and config
This commit is contained in:
		
							parent
							
								
									ff56e0e622
								
							
						
					
					
						commit
						bb3c730e58
					
				@ -23,6 +23,7 @@ import org.thingsboard.server.queue.memory.InMemoryStorage;
 | 
			
		||||
@RunWith(ClasspathSuite.class)
 | 
			
		||||
@ClasspathSuite.ClassnameFilters({
 | 
			
		||||
        "org.thingsboard.server.service.resource.sql.*Test",
 | 
			
		||||
        "org.thingsboard.server.service.sql.*Test"
 | 
			
		||||
})
 | 
			
		||||
public class ServiceSqlTestSuite {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,194 @@
 | 
			
		||||
package org.thingsboard.server.service.sql;
 | 
			
		||||
 | 
			
		||||
import com.google.gson.JsonObject;
 | 
			
		||||
import com.google.gson.JsonParser;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.jetbrains.annotations.NotNull;
 | 
			
		||||
import org.junit.After;
 | 
			
		||||
import org.junit.Assert;
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode;
 | 
			
		||||
import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.Device;
 | 
			
		||||
import org.thingsboard.server.common.data.Tenant;
 | 
			
		||||
import org.thingsboard.server.common.data.User;
 | 
			
		||||
import org.thingsboard.server.common.data.asset.Asset;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.JsonDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.security.Authority;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgDataType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
 | 
			
		||||
import org.thingsboard.server.controller.AbstractControllerTest;
 | 
			
		||||
import org.thingsboard.server.dao.service.DaoSqlTest;
 | 
			
		||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
 | 
			
		||||
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.TimeoutException;
 | 
			
		||||
 | 
			
		||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 | 
			
		||||
 | 
			
		||||
@DaoSqlTest
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest {
 | 
			
		||||
 | 
			
		||||
    static final int TIMEOUT = 30;
 | 
			
		||||
 | 
			
		||||
    final String TOTALIZER = "Totalizer";
 | 
			
		||||
    final int TTL = 99999;
 | 
			
		||||
    final String GENERIC_CUMULATIVE_OBJ = "genericCumulativeObj";
 | 
			
		||||
    final List<Long> ts = List.of(10L, 20L, 30L, 40L, 60L, 70L, 50L, 80L);
 | 
			
		||||
    final List<Long> msgValue = List.of(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    TimeseriesService timeseriesService;
 | 
			
		||||
 | 
			
		||||
    TbMsgTimeseriesNodeConfiguration configuration;
 | 
			
		||||
    Tenant savedTenant;
 | 
			
		||||
    User tenantAdmin;
 | 
			
		||||
 | 
			
		||||
    @Before
 | 
			
		||||
    public void beforeTest() throws Exception {
 | 
			
		||||
        configuration = new TbMsgTimeseriesNodeConfiguration();
 | 
			
		||||
        configuration.setIgnoreMetadataTs(true);
 | 
			
		||||
 | 
			
		||||
        loginSysAdmin();
 | 
			
		||||
 | 
			
		||||
        Tenant tenant = new Tenant();
 | 
			
		||||
        tenant.setTitle("My tenant");
 | 
			
		||||
        savedTenant = doPost("/api/tenant", tenant, Tenant.class);
 | 
			
		||||
        Assert.assertNotNull(savedTenant);
 | 
			
		||||
 | 
			
		||||
        tenantAdmin = new User();
 | 
			
		||||
        tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
 | 
			
		||||
        tenantAdmin.setTenantId(savedTenant.getId());
 | 
			
		||||
        tenantAdmin.setEmail("tenant2@thingsboard.org");
 | 
			
		||||
        tenantAdmin.setFirstName("Joe");
 | 
			
		||||
        tenantAdmin.setLastName("Downs");
 | 
			
		||||
 | 
			
		||||
        tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @After
 | 
			
		||||
    public void afterTest() throws Exception {
 | 
			
		||||
        loginSysAdmin();
 | 
			
		||||
        doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testSequentialTimeseriesPersistence() throws Exception {
 | 
			
		||||
        Asset asset = saveAsset("Asset");
 | 
			
		||||
 | 
			
		||||
        Device deviceA = saveDevice("Device A");
 | 
			
		||||
        Device deviceB = saveDevice("Device B");
 | 
			
		||||
        Device deviceC = saveDevice("Device C");
 | 
			
		||||
        Device deviceD = saveDevice("Device D");
 | 
			
		||||
        List<Device> devices = List.of(deviceA, deviceB, deviceC, deviceD);
 | 
			
		||||
 | 
			
		||||
        for (int i = 0; i < 2; i++) {
 | 
			
		||||
            int idx = i * (devices.size());
 | 
			
		||||
            saveLatestTsForAssetAndDevice(devices, asset, idx);
 | 
			
		||||
            checkDiffBetweenLatestTsForDevicesAndAsset(devices, asset);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Device saveDevice(String name) throws Exception {
 | 
			
		||||
        Device device = new Device();
 | 
			
		||||
        device.setName(name);
 | 
			
		||||
        device.setType("default");
 | 
			
		||||
        Device savedDevice = doPost("/api/device", device, Device.class);
 | 
			
		||||
        Assert.assertNotNull(savedDevice);
 | 
			
		||||
        return savedDevice;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Asset saveAsset(String name) throws Exception {
 | 
			
		||||
        Asset asset = new Asset();
 | 
			
		||||
        asset.setName(name);
 | 
			
		||||
        asset.setType("default");
 | 
			
		||||
        Asset savedAsset = doPost("/api/asset", asset, Asset.class);
 | 
			
		||||
        Assert.assertNotNull(savedAsset);
 | 
			
		||||
        return savedAsset;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void saveLatestTsForAssetAndDevice(List<Device> devices, Asset asset, int idx) throws ExecutionException, InterruptedException, TimeoutException {
 | 
			
		||||
        for (Device device : devices) {
 | 
			
		||||
            TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(),
 | 
			
		||||
                    device.getId(),
 | 
			
		||||
                    getTbMsgMetadata(device.getName(), ts.get(idx)),
 | 
			
		||||
                    TbMsgDataType.JSON,
 | 
			
		||||
                    getTbMsgData(msgValue.get(idx)));
 | 
			
		||||
            saveDeviceTsEntry(device.getId(), tbMsg, msgValue.get(idx));
 | 
			
		||||
            saveAssetTsEntry(asset, device.getName(), msgValue.get(idx), TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isIgnoreMetadataTs()));
 | 
			
		||||
            idx++;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void checkDiffBetweenLatestTsForDevicesAndAsset(List<Device> devices, Asset asset) throws ExecutionException, InterruptedException, TimeoutException {
 | 
			
		||||
        TsKvEntry assetTsKvEntry = getTsKvLatest(asset.getId(), GENERIC_CUMULATIVE_OBJ);
 | 
			
		||||
        Assert.assertTrue(assetTsKvEntry.getJsonValue().isPresent());
 | 
			
		||||
        JsonObject assetJsonObject = new JsonParser().parse(assetTsKvEntry.getJsonValue().get()).getAsJsonObject();
 | 
			
		||||
        for (Device device : devices) {
 | 
			
		||||
            Long assetValue = assetJsonObject.get(device.getName()).getAsLong();
 | 
			
		||||
            TsKvEntry deviceLatest = getTsKvLatest(device.getId(), TOTALIZER);
 | 
			
		||||
            Assert.assertTrue(deviceLatest.getLongValue().isPresent());
 | 
			
		||||
            Long deviceValue = deviceLatest.getLongValue().get();
 | 
			
		||||
            Assert.assertEquals(assetValue, deviceValue);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    String getTbMsgData(long value) {
 | 
			
		||||
        return "{\"Totalizer\": " + value + "}";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    TbMsgMetaData getTbMsgMetadata(String name, long ts) {
 | 
			
		||||
        Map<String, String> metadata = new HashMap<>();
 | 
			
		||||
        metadata.put("deviceName", name);
 | 
			
		||||
        metadata.put("ts", String.valueOf(ts));
 | 
			
		||||
        return new TbMsgMetaData(metadata);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void saveDeviceTsEntry(EntityId entityId, TbMsg tbMsg, long value) throws ExecutionException, InterruptedException, TimeoutException {
 | 
			
		||||
        TsKvEntry tsKvEntry = new BasicTsKvEntry(TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isIgnoreMetadataTs()), new LongDataEntry(TOTALIZER, value));
 | 
			
		||||
        saveTimeseries(entityId, tsKvEntry);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void saveAssetTsEntry(Asset asset, String key, long value, long ts) throws ExecutionException, InterruptedException, TimeoutException {
 | 
			
		||||
        Optional<String> tsKvEntryOpt = getTsKvLatest(asset.getId(), GENERIC_CUMULATIVE_OBJ).getJsonValue();
 | 
			
		||||
        TsKvEntry saveTsKvEntry = new BasicTsKvEntry(ts, new JsonDataEntry(GENERIC_CUMULATIVE_OBJ, getJsonObject(key, value, tsKvEntryOpt).toString()));
 | 
			
		||||
        saveTimeseries(asset.getId(), saveTsKvEntry);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @NotNull
 | 
			
		||||
    private JsonObject getJsonObject(String key, long value, Optional<String> tsKvEntryOpt) {
 | 
			
		||||
        JsonObject jsonObject = new JsonObject();
 | 
			
		||||
        if (tsKvEntryOpt.isPresent()) {
 | 
			
		||||
            jsonObject = new JsonParser().parse(tsKvEntryOpt.get()).getAsJsonObject();
 | 
			
		||||
        }
 | 
			
		||||
        jsonObject.addProperty(key, value);
 | 
			
		||||
        return jsonObject;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void saveTimeseries(EntityId entityId, TsKvEntry saveTsKvEntry) throws InterruptedException, ExecutionException, TimeoutException {
 | 
			
		||||
        timeseriesService.save(savedTenant.getId(), entityId, List.of(saveTsKvEntry), TTL).get(TIMEOUT, TimeUnit.SECONDS);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    TsKvEntry getTsKvLatest(EntityId entityId, String key) throws InterruptedException, ExecutionException, TimeoutException {
 | 
			
		||||
        List<TsKvEntry> tsKvEntries = timeseriesService.findLatest(
 | 
			
		||||
                savedTenant.getTenantId(),
 | 
			
		||||
                entityId,
 | 
			
		||||
                List.of(key)).get(TIMEOUT, TimeUnit.SECONDS);
 | 
			
		||||
        Assert.assertEquals(1, tsKvEntries.size());
 | 
			
		||||
        return tsKvEntries.get(0);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -77,7 +77,7 @@ public class TbMsgTimeseriesNode implements TbNode {
 | 
			
		||||
            ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        long ts = config.isSaveWithMsgTs() ? msg.getTs() : getTs(msg);
 | 
			
		||||
        long ts = computeTs(msg, config.isIgnoreMetadataTs());
 | 
			
		||||
        String src = msg.getData();
 | 
			
		||||
        Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
 | 
			
		||||
        if (tsKvMap.isEmpty()) {
 | 
			
		||||
@ -102,6 +102,10 @@ public class TbMsgTimeseriesNode implements TbNode {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static long computeTs(TbMsg msg, boolean saveWithMsgTs) {
 | 
			
		||||
        return saveWithMsgTs ? System.currentTimeMillis() : getTs(msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static long getTs(TbMsg msg) {
 | 
			
		||||
        long ts = -1;
 | 
			
		||||
        String tsStr = msg.getMetaData().getValue("ts");
 | 
			
		||||
 | 
			
		||||
@ -23,14 +23,14 @@ public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration<TbMsg
 | 
			
		||||
 | 
			
		||||
    private long defaultTTL;
 | 
			
		||||
    private boolean skipLatestPersistence;
 | 
			
		||||
    private boolean saveWithMsgTs;
 | 
			
		||||
    private boolean ignoreMetadataTs;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbMsgTimeseriesNodeConfiguration defaultConfiguration() {
 | 
			
		||||
        TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration();
 | 
			
		||||
        configuration.setDefaultTTL(0L);
 | 
			
		||||
        configuration.setSkipLatestPersistence(false);
 | 
			
		||||
        configuration.setSkipLatestPersistence(false);
 | 
			
		||||
        configuration.setIgnoreMetadataTs(false);
 | 
			
		||||
        return configuration;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user