From 96eff9cf1a80fa91600efe6f571600962602b579 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 21 Apr 2022 17:39:41 +0300 Subject: [PATCH] web socket test and client refactored to simplify usage --- .../controller/BaseWebsocketApiTest.java | 286 ++++-------------- .../controller/TbTestWebSocketClient.java | 113 +++++++ 2 files changed, 172 insertions(+), 227 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java index 111f14acb0..90d6ac7b9d 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java @@ -18,12 +18,12 @@ package org.thingsboard.server.controller; import com.google.common.util.concurrent.FutureCallback; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; @@ -44,63 +44,51 @@ import org.thingsboard.server.common.data.query.NumericFilterPredicate; import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; -import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper; import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd; import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountUpdate; -import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; -import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd; -import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd; -import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + @Slf4j public abstract class BaseWebsocketApiTest extends AbstractControllerTest { @Autowired private TelemetrySubscriptionService tsService; + Device device; + DeviceTypeFilter dtf; + @Before public void setUp() throws Exception { loginTenantAdmin(); - } - @Test - public void testEntityDataHistoryWsCmd() throws Exception { - Device device = new Device(); + device = new Device(); device.setName("Device"); device.setType("default"); device.setLabel("testLabel" + (int) (Math.random() * 1000)); device = doPost("/api/device", device, Device.class); + dtf = new DeviceTypeFilter(device.getType(), device.getName()); + } + @After + public void tearDown() throws Exception { + doDelete("/api/device/" + device.getId().getId()) + .andExpect(status().isOk()); + } + + @Test + public void testEntityDataHistoryWsCmd() throws Exception { + List keys = List.of("temperature"); long now = System.currentTimeMillis(); - DeviceTypeFilter dtf = new DeviceTypeFilter(); - dtf.setDeviceNameFilter("D"); - dtf.setDeviceType("default"); - EntityDataQuery edq = new EntityDataQuery(dtf, - new EntityDataPageLink(1, 0, null, null), - Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + EntityDataUpdate update = getWsClient().sendHistoryCmd(keys, now, TimeUnit.HOURS.toMillis(1), dtf); - EntityHistoryCmd historyCmd = new EntityHistoryCmd(); - historyCmd.setKeys(Arrays.asList("temperature")); - historyCmd.setAgg(Aggregation.NONE); - historyCmd.setLimit(1000); - historyCmd.setStartTs(now - TimeUnit.HOURS.toMillis(1)); - historyCmd.setEndTs(now); - EntityDataCmd cmd = new EntityDataCmd(1, edq, historyCmd, null, null); - - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); - wrapper.setEntityDataCmds(Collections.singletonList(cmd)); - - getWsClient().send(mapper.writeValueAsString(wrapper)); - String msg = getWsClient().waitForReply(); - EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class); Assert.assertEquals(1, update.getCmdId()); PageData pageData = update.getData(); Assert.assertNotNull(pageData); @@ -116,9 +104,8 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { sendTelemetry(device, tsData); Thread.sleep(100); - getWsClient().send(mapper.writeValueAsString(wrapper)); - msg = getWsClient().waitForReply(); - update = mapper.readValue(msg, EntityDataUpdate.class); + update = getWsClient().sendHistoryCmd(keys, now, TimeUnit.HOURS.toMillis(1), dtf); + Assert.assertEquals(1, update.getCmdId()); List dataList = update.getUpdate(); Assert.assertNotNull(dataList); @@ -133,41 +120,16 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { @Test public void testEntityDataTimeSeriesWsCmd() throws Exception { - Device device = new Device(); - device.setName("Device"); - device.setType("default"); - device.setLabel("testLabel" + (int) (Math.random() * 1000)); - device = doPost("/api/device", device, Device.class); - long now = System.currentTimeMillis(); - DeviceTypeFilter dtf = new DeviceTypeFilter(); - dtf.setDeviceNameFilter("D"); - dtf.setDeviceType("default"); - EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), - Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + EntityDataUpdate update = getWsClient().sendEntityDataQuery(dtf); - EntityDataCmd cmd = new EntityDataCmd(1, edq, null, null, null); - - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); - wrapper.setEntityDataCmds(Collections.singletonList(cmd)); - - getWsClient().send(mapper.writeValueAsString(wrapper)); - String msg = getWsClient().waitForReply(); - EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class); Assert.assertEquals(1, update.getCmdId()); PageData pageData = update.getData(); Assert.assertNotNull(pageData); Assert.assertEquals(1, pageData.getData().size()); Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId()); - TimeSeriesCmd tsCmd = new TimeSeriesCmd(); - tsCmd.setKeys(Arrays.asList("temperature")); - tsCmd.setAgg(Aggregation.NONE); - tsCmd.setLimit(1000); - tsCmd.setStartTs(now - TimeUnit.HOURS.toMillis(1)); - tsCmd.setTimeWindow(TimeUnit.HOURS.toMillis(1)); - TsKvEntry dataPoint1 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("temperature", 42L)); TsKvEntry dataPoint2 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(2), new LongDataEntry("temperature", 43L)); TsKvEntry dataPoint3 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(3), new LongDataEntry("temperature", 44L)); @@ -176,12 +138,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { sendTelemetry(device, tsData); Thread.sleep(100); - cmd = new EntityDataCmd(1, null, null, null, tsCmd); - wrapper = new TelemetryPluginCmdsWrapper(); - wrapper.setEntityDataCmds(Collections.singletonList(cmd)); - getWsClient().send(mapper.writeValueAsString(wrapper)); - msg = getWsClient().waitForReply(); - update = mapper.readValue(msg, EntityDataUpdate.class); + update = getWsClient().subscribeTsUpdate(List.of("temperature"), now, TimeUnit.HOURS.toMillis(1)); Assert.assertEquals(1, update.getCmdId()); List listData = update.getUpdate(); Assert.assertNotNull(listData); @@ -198,7 +155,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { getWsClient().registerWaitForUpdate(); Thread.sleep(100); sendTelemetry(device, Arrays.asList(dataPoint4)); - msg = getWsClient().waitForUpdate(); + String msg = getWsClient().waitForUpdate(); update = mapper.readValue(msg, EntityDataUpdate.class); Assert.assertEquals(1, update.getCmdId()); @@ -214,44 +171,26 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { @Test public void testEntityCountWsCmd() throws Exception { - Device device = new Device(); - device.setName("Device"); - device.setType("default"); - device.setLabel("testLabel" + (int) (Math.random() * 1000)); - device = doPost("/api/device", device, Device.class); - AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(System.currentTimeMillis(), new LongDataEntry("temperature", 42L)); sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Collections.singletonList(dataPoint1)); - DeviceTypeFilter dtf1 = new DeviceTypeFilter(); - dtf1.setDeviceNameFilter("D"); - dtf1.setDeviceType("default"); - EntityCountQuery edq1 = new EntityCountQuery(dtf1, Collections.emptyList()); - + EntityCountQuery edq1 = new EntityCountQuery(dtf, Collections.emptyList()); EntityCountCmd cmd1 = new EntityCountCmd(1, edq1); - TelemetryPluginCmdsWrapper wrapper1 = new TelemetryPluginCmdsWrapper(); - wrapper1.setEntityCountCmds(Collections.singletonList(cmd1)); + getWsClient().send(cmd1); - getWsClient().send(mapper.writeValueAsString(wrapper1)); - String msg1 = getWsClient().waitForReply(); - EntityCountUpdate update1 = mapper.readValue(msg1, EntityCountUpdate.class); + EntityCountUpdate update1 = getWsClient().parseCountReply(getWsClient().waitForReply()); Assert.assertEquals(1, update1.getCmdId()); Assert.assertEquals(1, update1.getCount()); - DeviceTypeFilter dtf2 = new DeviceTypeFilter(); - dtf2.setDeviceNameFilter("D"); - dtf2.setDeviceType("non-existing-device-type"); + DeviceTypeFilter dtf2 = new DeviceTypeFilter("non-existing-device-type", "D"); EntityCountQuery edq2 = new EntityCountQuery(dtf2, Collections.emptyList()); - EntityCountCmd cmd2 = new EntityCountCmd(2, edq2); - TelemetryPluginCmdsWrapper wrapper2 = new TelemetryPluginCmdsWrapper(); - wrapper2.setEntityCountCmds(Collections.singletonList(cmd2)); - getWsClient().send(mapper.writeValueAsString(wrapper2)); + getWsClient().send(cmd2); String msg2 = getWsClient().waitForReply(); - EntityCountUpdate update2 = mapper.readValue(msg2, EntityCountUpdate.class); + EntityCountUpdate update2 = getWsClient().parseCountReply(getWsClient().waitForReply()); Assert.assertEquals(2, update2.getCmdId()); Assert.assertEquals(0, update2.getCount()); @@ -263,19 +202,12 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { highTemperatureFilter.setPredicate(predicate); highTemperatureFilter.setValueType(EntityKeyValueType.NUMERIC); - DeviceTypeFilter dtf3 = new DeviceTypeFilter(); - dtf3.setDeviceNameFilter("D"); - dtf3.setDeviceType("default"); + DeviceTypeFilter dtf3 = new DeviceTypeFilter("default", "D"); EntityCountQuery edq3 = new EntityCountQuery(dtf3, Collections.singletonList(highTemperatureFilter)); - EntityCountCmd cmd3 = new EntityCountCmd(3, edq3); + getWsClient().send(cmd3); - TelemetryPluginCmdsWrapper wrapper3 = new TelemetryPluginCmdsWrapper(); - wrapper3.setEntityCountCmds(Collections.singletonList(cmd3)); - getWsClient().send(mapper.writeValueAsString(wrapper3)); - - String msg3 = getWsClient().waitForReply(); - EntityCountUpdate update3 = mapper.readValue(msg3, EntityCountUpdate.class); + EntityCountUpdate update3 = getWsClient().parseCountReply(getWsClient().waitForReply()); Assert.assertEquals(3, update3.getCmdId()); Assert.assertEquals(1, update3.getCount()); @@ -287,47 +219,26 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { highTemperatureFilter2.setPredicate(predicate2); highTemperatureFilter2.setValueType(EntityKeyValueType.NUMERIC); - DeviceTypeFilter dtf4 = new DeviceTypeFilter(); - dtf4.setDeviceNameFilter("D"); - dtf4.setDeviceType("default"); + DeviceTypeFilter dtf4 = new DeviceTypeFilter("default", "D"); EntityCountQuery edq4 = new EntityCountQuery(dtf4, Collections.singletonList(highTemperatureFilter2)); - EntityCountCmd cmd4 = new EntityCountCmd(4, edq4); - TelemetryPluginCmdsWrapper wrapper4 = new TelemetryPluginCmdsWrapper(); - wrapper4.setEntityCountCmds(Collections.singletonList(cmd4)); - getWsClient().send(mapper.writeValueAsString(wrapper4)); + getWsClient().send(cmd4); - String msg4 = getWsClient().waitForReply(); - EntityCountUpdate update4 = mapper.readValue(msg4, EntityCountUpdate.class); + EntityCountUpdate update4 = getWsClient().parseCountReply(getWsClient().waitForReply()); Assert.assertEquals(4, update4.getCmdId()); Assert.assertEquals(0, update4.getCount()); } @Test public void testEntityDataLatestWidgetFlow() throws Exception { - Device device = new Device(); - device.setName("Device"); - device.setType("default"); - device.setLabel("testLabel" + (int) (Math.random() * 1000)); - device = doPost("/api/device", device, Device.class); + List keys = List.of(new EntityKey(EntityKeyType.TIME_SERIES, "temperature")); + long now = System.currentTimeMillis() - 100; - long now = System.currentTimeMillis(); + EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), + Collections.emptyList(), keys, Collections.emptyList()); - DeviceTypeFilter dtf = new DeviceTypeFilter(); - dtf.setDeviceNameFilter("D"); - dtf.setDeviceType("default"); - EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), Collections.emptyList(), - Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "temperature")), Collections.emptyList()); - - EntityDataCmd cmd = new EntityDataCmd(1, edq, null, null, null); - - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); - wrapper.setEntityDataCmds(Collections.singletonList(cmd)); - - getWsClient().send(mapper.writeValueAsString(wrapper)); - String msg = getWsClient().waitForReply(); - EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class); + EntityDataUpdate update = getWsClient().sendEntityDataQuery(edq); Assert.assertEquals(1, update.getCmdId()); PageData pageData = update.getData(); Assert.assertNotNull(pageData); @@ -341,17 +252,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { List tsData = Arrays.asList(dataPoint1); sendTelemetry(device, tsData); - Thread.sleep(100); - - LatestValueCmd latestCmd = new LatestValueCmd(); - latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"))); - cmd = new EntityDataCmd(1, null, null, latestCmd, null); - wrapper = new TelemetryPluginCmdsWrapper(); - wrapper.setEntityDataCmds(Collections.singletonList(cmd)); - - getWsClient().send(mapper.writeValueAsString(wrapper)); - msg = getWsClient().waitForReply(); - update = mapper.readValue(msg, EntityDataUpdate.class); + update = getWsClient().subscribeLatestUpdate(keys); Assert.assertEquals(1, update.getCmdId()); @@ -368,9 +269,8 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { getWsClient().registerWaitForUpdate(); sendTelemetry(device, Arrays.asList(dataPoint2)); - msg = getWsClient().waitForUpdate(); - update = mapper.readValue(msg, EntityDataUpdate.class); + update = getWsClient().parseDataReply(getWsClient().waitForUpdate()); Assert.assertEquals(1, update.getCmdId()); List eData = update.getUpdate(); Assert.assertNotNull(eData); @@ -383,7 +283,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { //Sending update from the past, while latest value has new timestamp; getWsClient().registerWaitForUpdate(); sendTelemetry(device, Arrays.asList(dataPoint1)); - msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1)); + String msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1)); Assert.assertNull(msg); //Sending duplicate update again @@ -395,29 +295,11 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { @Test public void testEntityDataLatestTsWsCmd() throws Exception { - Device device = new Device(); - device.setName("Device"); - device.setType("default"); - device.setLabel("testLabel" + (int) (Math.random() * 1000)); - device = doPost("/api/device", device, Device.class); - long now = System.currentTimeMillis(); + List keys = List.of(new EntityKey(EntityKeyType.TIME_SERIES, "temperature")); - DeviceTypeFilter dtf = new DeviceTypeFilter(); - dtf.setDeviceNameFilter("D"); - dtf.setDeviceType("default"); - EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + EntityDataUpdate update = getWsClient().subscribeLatestUpdate(keys, dtf); - LatestValueCmd latestCmd = new LatestValueCmd(); - latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"))); - EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null); - - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); - wrapper.setEntityDataCmds(Collections.singletonList(cmd)); - - getWsClient().send(mapper.writeValueAsString(wrapper)); - String msg = getWsClient().waitForReply(); - EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class); Assert.assertEquals(1, update.getCmdId()); PageData pageData = update.getData(); Assert.assertNotNull(pageData); @@ -433,13 +315,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { Thread.sleep(100); - cmd = new EntityDataCmd(1, edq, null, latestCmd, null); - wrapper = new TelemetryPluginCmdsWrapper(); - wrapper.setEntityDataCmds(Collections.singletonList(cmd)); - - getWsClient().send(mapper.writeValueAsString(wrapper)); - msg = getWsClient().waitForReply(); - update = mapper.readValue(msg, EntityDataUpdate.class); + update = getWsClient().subscribeLatestUpdate(keys, dtf); Assert.assertEquals(1, update.getCmdId()); @@ -456,9 +332,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { getWsClient().registerWaitForUpdate(); sendTelemetry(device, Arrays.asList(dataPoint2)); - msg = getWsClient().waitForUpdate(); - - update = mapper.readValue(msg, EntityDataUpdate.class); + update = getWsClient().parseDataReply(getWsClient().waitForUpdate()); Assert.assertEquals(1, update.getCmdId()); List eData = update.getUpdate(); Assert.assertNotNull(eData); @@ -471,7 +345,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { //Sending update from the past, while latest value has new timestamp; getWsClient().registerWaitForUpdate(); sendTelemetry(device, Arrays.asList(dataPoint1)); - msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1)); + String msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1)); Assert.assertNull(msg); //Sending duplicate update again @@ -483,31 +357,10 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { @Test public void testEntityDataLatestAttrWsCmd() throws Exception { - Device device = new Device(); - device.setName("Device"); - device.setType("default"); - device.setLabel("testLabel" + (int) (Math.random() * 1000)); - device = doPost("/api/device", device, Device.class); - long now = System.currentTimeMillis(); + List keys = List.of(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "serverAttributeKey")); - DeviceTypeFilter dtf = new DeviceTypeFilter(); - dtf.setDeviceNameFilter("D"); - dtf.setDeviceType("default"); - EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), - Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); - - LatestValueCmd latestCmd = new LatestValueCmd(); - latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "serverAttributeKey"))); - EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null); - - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); - wrapper.setEntityDataCmds(Collections.singletonList(cmd)); - - getWsClient().send(mapper.writeValueAsString(wrapper)); - String msg = getWsClient().waitForReply(); - Assert.assertNotNull(msg); - EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class); + EntityDataUpdate update = getWsClient().subscribeLatestUpdate(keys, dtf); Assert.assertEquals(1, update.getCmdId()); PageData pageData = update.getData(); Assert.assertNotNull(pageData); @@ -517,7 +370,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getTs()); Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getValue()); - getWsClient().registerWaitForUpdate(); Thread.sleep(500); @@ -525,7 +377,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { List tsData = Arrays.asList(dataPoint1); sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, tsData); - msg = getWsClient().waitForUpdate(); + String msg = getWsClient().waitForUpdate(); Assert.assertNotNull(msg); update = mapper.readValue(msg, EntityDataUpdate.class); @@ -574,35 +426,15 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { @Test public void testEntityDataLatestAttrTypesWsCmd() throws Exception { - Device device = new Device(); - device.setName("Device"); - device.setType("default"); - device.setLabel("testLabel" + (int) (Math.random() * 1000)); - device = doPost("/api/device", device, Device.class); - long now = System.currentTimeMillis(); + List keys = List.of( + new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "serverAttributeKey"), + new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, "clientAttributeKey"), + new EntityKey(EntityKeyType.SHARED_ATTRIBUTE, "sharedAttributeKey"), + new EntityKey(EntityKeyType.ATTRIBUTE, "anyAttributeKey")); - DeviceTypeFilter dtf = new DeviceTypeFilter(); - dtf.setDeviceNameFilter("D"); - dtf.setDeviceType("default"); - EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), - Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + EntityDataUpdate update = getWsClient().subscribeLatestUpdate(keys, dtf); - LatestValueCmd latestCmd = new LatestValueCmd(); - List keys = new ArrayList<>(); - keys.add(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "serverAttributeKey")); - keys.add(new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, "clientAttributeKey")); - keys.add(new EntityKey(EntityKeyType.SHARED_ATTRIBUTE, "sharedAttributeKey")); - keys.add(new EntityKey(EntityKeyType.ATTRIBUTE, "anyAttributeKey")); - latestCmd.setKeys(keys); - EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null); - - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); - wrapper.setEntityDataCmds(Collections.singletonList(cmd)); - - getWsClient().send(mapper.writeValueAsString(wrapper)); - String msg = getWsClient().waitForReply(); - EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class); Assert.assertEquals(1, update.getCmdId()); PageData pageData = update.getData(); Assert.assertNotNull(pageData); @@ -628,7 +460,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, tsData); - msg = getWsClient().waitForUpdate(); + String msg = getWsClient().waitForUpdate(); Assert.assertNotNull(msg); update = mapper.readValue(msg, EntityDataUpdate.class); Assert.assertEquals(1, update.getCmdId()); diff --git a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java index 3abd311aa3..013547aa24 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java +++ b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java @@ -18,9 +18,25 @@ package org.thingsboard.server.controller; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.query.EntityDataPageLink; +import org.thingsboard.server.common.data.query.EntityDataQuery; +import org.thingsboard.server.common.data.query.EntityFilter; +import org.thingsboard.server.common.data.query.EntityKey; +import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountUpdate; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd; +import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd; +import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd; import java.net.URI; import java.nio.channels.NotYetConnectedException; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -73,6 +89,18 @@ public class TbTestWebSocketClient extends WebSocketClient { super.send(text); } + public void send(EntityDataCmd cmd) throws NotYetConnectedException { + TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); + wrapper.setEntityDataCmds(Collections.singletonList(cmd)); + this.send(JacksonUtil.toString(wrapper)); + } + + public void send(EntityCountCmd cmd) throws NotYetConnectedException { + TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); + wrapper.setEntityCountCmds(Collections.singletonList(cmd)); + this.send(JacksonUtil.toString(wrapper)); + } + public String waitForUpdate() { return waitForUpdate(TimeUnit.SECONDS.toMillis(3)); } @@ -94,4 +122,89 @@ public class TbTestWebSocketClient extends WebSocketClient { } return lastMsg; } + + public EntityDataUpdate parseDataReply(String msg) { + return JacksonUtil.fromString(msg, EntityDataUpdate.class); + } + + public EntityCountUpdate parseCountReply(String msg) { + return JacksonUtil.fromString(msg, EntityCountUpdate.class); + } + + public EntityDataUpdate subscribeLatestUpdate(List keys, EntityFilter entityFilter) { + EntityDataQuery edq = new EntityDataQuery(entityFilter, new EntityDataPageLink(1, 0, null, null), + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + return subscribeLatestUpdate(keys, edq); + } + + public EntityDataUpdate subscribeLatestUpdate(List keys) { + return subscribeLatestUpdate(keys, (EntityDataQuery) null); + } + + public EntityDataUpdate subscribeLatestUpdate(List keys, EntityDataQuery edq) { + LatestValueCmd latestCmd = new LatestValueCmd(); + latestCmd.setKeys(keys); + EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null); + send(cmd); + return parseDataReply(waitForReply()); + } + + public EntityDataUpdate subscribeTsUpdate(List keys, long startTs, long timeWindow) { + return subscribeTsUpdate(keys, startTs, timeWindow, null); + } + + public EntityDataUpdate subscribeTsUpdate(List keys, long startTs, long timeWindow, EntityDataQuery edq) { + TimeSeriesCmd tsCmd = new TimeSeriesCmd(); + tsCmd.setKeys(keys); + tsCmd.setAgg(Aggregation.NONE); + tsCmd.setLimit(1000); + tsCmd.setStartTs(startTs - timeWindow); + tsCmd.setTimeWindow(timeWindow); + + EntityDataCmd cmd = new EntityDataCmd(1, edq, null, null, tsCmd); + + send(cmd); + return parseDataReply(waitForReply()); + } + + public EntityDataUpdate sendHistoryCmd(List keys, long startTs, long timeWindow) { + return sendHistoryCmd(keys, startTs, timeWindow, (EntityDataQuery) null); + } + + public EntityDataUpdate sendHistoryCmd(List keys, long startTs, long timeWindow, EntityFilter entityFilter) { + EntityDataQuery edq = new EntityDataQuery(entityFilter, + new EntityDataPageLink(1, 0, null, null), + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + return sendHistoryCmd(keys, startTs, timeWindow, edq); + } + + public EntityDataUpdate sendHistoryCmd(List keys, long startTs, long timeWindow, EntityDataQuery edq) { + EntityHistoryCmd historyCmd = new EntityHistoryCmd(); + historyCmd.setKeys(keys); + historyCmd.setAgg(Aggregation.NONE); + historyCmd.setLimit(1000); + historyCmd.setStartTs(startTs - timeWindow); + historyCmd.setEndTs(startTs); + + EntityDataCmd cmd = new EntityDataCmd(1, edq, historyCmd, null, null); + + send(cmd); + return parseDataReply(this.waitForReply()); + } + + public EntityDataUpdate sendEntityDataQuery(EntityDataQuery edq) { + log.warn("sendEntityDataQuery {}", edq); + EntityDataCmd cmd = new EntityDataCmd(1, edq, null, null, null); + send(cmd); + String msg = this.waitForReply(); + return parseDataReply(msg); + } + + public EntityDataUpdate sendEntityDataQuery(EntityFilter entityFilter) { + log.warn("sendEntityDataQuery {}", entityFilter); + EntityDataQuery edq = new EntityDataQuery(entityFilter, new EntityDataPageLink(1, 0, null, null), + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + return sendEntityDataQuery(edq); + } + }