web socket test and client refactored to simplify usage

This commit is contained in:
Sergey Matvienko 2022-04-21 17:39:41 +03:00
parent 7cdce3d9f8
commit 96eff9cf1a
2 changed files with 172 additions and 227 deletions

View File

@ -18,12 +18,12 @@ package org.thingsboard.server.controller;
import com.google.common.util.concurrent.FutureCallback;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
@ -44,63 +44,51 @@ import org.thingsboard.server.common.data.query.NumericFilterPredicate;
import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@Slf4j
public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
@Autowired
private TelemetrySubscriptionService tsService;
Device device;
DeviceTypeFilter dtf;
@Before
public void setUp() throws Exception {
loginTenantAdmin();
}
@Test
public void testEntityDataHistoryWsCmd() throws Exception {
Device device = new Device();
device = new Device();
device.setName("Device");
device.setType("default");
device.setLabel("testLabel" + (int) (Math.random() * 1000));
device = doPost("/api/device", device, Device.class);
dtf = new DeviceTypeFilter(device.getType(), device.getName());
}
@After
public void tearDown() throws Exception {
doDelete("/api/device/" + device.getId().getId())
.andExpect(status().isOk());
}
@Test
public void testEntityDataHistoryWsCmd() throws Exception {
List<String> keys = List.of("temperature");
long now = System.currentTimeMillis();
DeviceTypeFilter dtf = new DeviceTypeFilter();
dtf.setDeviceNameFilter("D");
dtf.setDeviceType("default");
EntityDataQuery edq = new EntityDataQuery(dtf,
new EntityDataPageLink(1, 0, null, null),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
EntityDataUpdate update = getWsClient().sendHistoryCmd(keys, now, TimeUnit.HOURS.toMillis(1), dtf);
EntityHistoryCmd historyCmd = new EntityHistoryCmd();
historyCmd.setKeys(Arrays.asList("temperature"));
historyCmd.setAgg(Aggregation.NONE);
historyCmd.setLimit(1000);
historyCmd.setStartTs(now - TimeUnit.HOURS.toMillis(1));
historyCmd.setEndTs(now);
EntityDataCmd cmd = new EntityDataCmd(1, edq, historyCmd, null, null);
TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
wrapper.setEntityDataCmds(Collections.singletonList(cmd));
getWsClient().send(mapper.writeValueAsString(wrapper));
String msg = getWsClient().waitForReply();
EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
Assert.assertEquals(1, update.getCmdId());
PageData<EntityData> pageData = update.getData();
Assert.assertNotNull(pageData);
@ -116,9 +104,8 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
sendTelemetry(device, tsData);
Thread.sleep(100);
getWsClient().send(mapper.writeValueAsString(wrapper));
msg = getWsClient().waitForReply();
update = mapper.readValue(msg, EntityDataUpdate.class);
update = getWsClient().sendHistoryCmd(keys, now, TimeUnit.HOURS.toMillis(1), dtf);
Assert.assertEquals(1, update.getCmdId());
List<EntityData> dataList = update.getUpdate();
Assert.assertNotNull(dataList);
@ -133,41 +120,16 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
@Test
public void testEntityDataTimeSeriesWsCmd() throws Exception {
Device device = new Device();
device.setName("Device");
device.setType("default");
device.setLabel("testLabel" + (int) (Math.random() * 1000));
device = doPost("/api/device", device, Device.class);
long now = System.currentTimeMillis();
DeviceTypeFilter dtf = new DeviceTypeFilter();
dtf.setDeviceNameFilter("D");
dtf.setDeviceType("default");
EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
EntityDataUpdate update = getWsClient().sendEntityDataQuery(dtf);
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, null, null);
TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
wrapper.setEntityDataCmds(Collections.singletonList(cmd));
getWsClient().send(mapper.writeValueAsString(wrapper));
String msg = getWsClient().waitForReply();
EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
Assert.assertEquals(1, update.getCmdId());
PageData<EntityData> pageData = update.getData();
Assert.assertNotNull(pageData);
Assert.assertEquals(1, pageData.getData().size());
Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId());
TimeSeriesCmd tsCmd = new TimeSeriesCmd();
tsCmd.setKeys(Arrays.asList("temperature"));
tsCmd.setAgg(Aggregation.NONE);
tsCmd.setLimit(1000);
tsCmd.setStartTs(now - TimeUnit.HOURS.toMillis(1));
tsCmd.setTimeWindow(TimeUnit.HOURS.toMillis(1));
TsKvEntry dataPoint1 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("temperature", 42L));
TsKvEntry dataPoint2 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(2), new LongDataEntry("temperature", 43L));
TsKvEntry dataPoint3 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(3), new LongDataEntry("temperature", 44L));
@ -176,12 +138,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
sendTelemetry(device, tsData);
Thread.sleep(100);
cmd = new EntityDataCmd(1, null, null, null, tsCmd);
wrapper = new TelemetryPluginCmdsWrapper();
wrapper.setEntityDataCmds(Collections.singletonList(cmd));
getWsClient().send(mapper.writeValueAsString(wrapper));
msg = getWsClient().waitForReply();
update = mapper.readValue(msg, EntityDataUpdate.class);
update = getWsClient().subscribeTsUpdate(List.of("temperature"), now, TimeUnit.HOURS.toMillis(1));
Assert.assertEquals(1, update.getCmdId());
List<EntityData> listData = update.getUpdate();
Assert.assertNotNull(listData);
@ -198,7 +155,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
getWsClient().registerWaitForUpdate();
Thread.sleep(100);
sendTelemetry(device, Arrays.asList(dataPoint4));
msg = getWsClient().waitForUpdate();
String msg = getWsClient().waitForUpdate();
update = mapper.readValue(msg, EntityDataUpdate.class);
Assert.assertEquals(1, update.getCmdId());
@ -214,44 +171,26 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
@Test
public void testEntityCountWsCmd() throws Exception {
Device device = new Device();
device.setName("Device");
device.setType("default");
device.setLabel("testLabel" + (int) (Math.random() * 1000));
device = doPost("/api/device", device, Device.class);
AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(System.currentTimeMillis(), new LongDataEntry("temperature", 42L));
sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Collections.singletonList(dataPoint1));
DeviceTypeFilter dtf1 = new DeviceTypeFilter();
dtf1.setDeviceNameFilter("D");
dtf1.setDeviceType("default");
EntityCountQuery edq1 = new EntityCountQuery(dtf1, Collections.emptyList());
EntityCountQuery edq1 = new EntityCountQuery(dtf, Collections.emptyList());
EntityCountCmd cmd1 = new EntityCountCmd(1, edq1);
TelemetryPluginCmdsWrapper wrapper1 = new TelemetryPluginCmdsWrapper();
wrapper1.setEntityCountCmds(Collections.singletonList(cmd1));
getWsClient().send(cmd1);
getWsClient().send(mapper.writeValueAsString(wrapper1));
String msg1 = getWsClient().waitForReply();
EntityCountUpdate update1 = mapper.readValue(msg1, EntityCountUpdate.class);
EntityCountUpdate update1 = getWsClient().parseCountReply(getWsClient().waitForReply());
Assert.assertEquals(1, update1.getCmdId());
Assert.assertEquals(1, update1.getCount());
DeviceTypeFilter dtf2 = new DeviceTypeFilter();
dtf2.setDeviceNameFilter("D");
dtf2.setDeviceType("non-existing-device-type");
DeviceTypeFilter dtf2 = new DeviceTypeFilter("non-existing-device-type", "D");
EntityCountQuery edq2 = new EntityCountQuery(dtf2, Collections.emptyList());
EntityCountCmd cmd2 = new EntityCountCmd(2, edq2);
TelemetryPluginCmdsWrapper wrapper2 = new TelemetryPluginCmdsWrapper();
wrapper2.setEntityCountCmds(Collections.singletonList(cmd2));
getWsClient().send(mapper.writeValueAsString(wrapper2));
getWsClient().send(cmd2);
String msg2 = getWsClient().waitForReply();
EntityCountUpdate update2 = mapper.readValue(msg2, EntityCountUpdate.class);
EntityCountUpdate update2 = getWsClient().parseCountReply(getWsClient().waitForReply());
Assert.assertEquals(2, update2.getCmdId());
Assert.assertEquals(0, update2.getCount());
@ -263,19 +202,12 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
highTemperatureFilter.setPredicate(predicate);
highTemperatureFilter.setValueType(EntityKeyValueType.NUMERIC);
DeviceTypeFilter dtf3 = new DeviceTypeFilter();
dtf3.setDeviceNameFilter("D");
dtf3.setDeviceType("default");
DeviceTypeFilter dtf3 = new DeviceTypeFilter("default", "D");
EntityCountQuery edq3 = new EntityCountQuery(dtf3, Collections.singletonList(highTemperatureFilter));
EntityCountCmd cmd3 = new EntityCountCmd(3, edq3);
getWsClient().send(cmd3);
TelemetryPluginCmdsWrapper wrapper3 = new TelemetryPluginCmdsWrapper();
wrapper3.setEntityCountCmds(Collections.singletonList(cmd3));
getWsClient().send(mapper.writeValueAsString(wrapper3));
String msg3 = getWsClient().waitForReply();
EntityCountUpdate update3 = mapper.readValue(msg3, EntityCountUpdate.class);
EntityCountUpdate update3 = getWsClient().parseCountReply(getWsClient().waitForReply());
Assert.assertEquals(3, update3.getCmdId());
Assert.assertEquals(1, update3.getCount());
@ -287,47 +219,26 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
highTemperatureFilter2.setPredicate(predicate2);
highTemperatureFilter2.setValueType(EntityKeyValueType.NUMERIC);
DeviceTypeFilter dtf4 = new DeviceTypeFilter();
dtf4.setDeviceNameFilter("D");
dtf4.setDeviceType("default");
DeviceTypeFilter dtf4 = new DeviceTypeFilter("default", "D");
EntityCountQuery edq4 = new EntityCountQuery(dtf4, Collections.singletonList(highTemperatureFilter2));
EntityCountCmd cmd4 = new EntityCountCmd(4, edq4);
TelemetryPluginCmdsWrapper wrapper4 = new TelemetryPluginCmdsWrapper();
wrapper4.setEntityCountCmds(Collections.singletonList(cmd4));
getWsClient().send(mapper.writeValueAsString(wrapper4));
getWsClient().send(cmd4);
String msg4 = getWsClient().waitForReply();
EntityCountUpdate update4 = mapper.readValue(msg4, EntityCountUpdate.class);
EntityCountUpdate update4 = getWsClient().parseCountReply(getWsClient().waitForReply());
Assert.assertEquals(4, update4.getCmdId());
Assert.assertEquals(0, update4.getCount());
}
@Test
public void testEntityDataLatestWidgetFlow() throws Exception {
Device device = new Device();
device.setName("Device");
device.setType("default");
device.setLabel("testLabel" + (int) (Math.random() * 1000));
device = doPost("/api/device", device, Device.class);
List<EntityKey> keys = List.of(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"));
long now = System.currentTimeMillis() - 100;
long now = System.currentTimeMillis();
EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null),
Collections.emptyList(), keys, Collections.emptyList());
DeviceTypeFilter dtf = new DeviceTypeFilter();
dtf.setDeviceNameFilter("D");
dtf.setDeviceType("default");
EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), Collections.emptyList(),
Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "temperature")), Collections.emptyList());
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, null, null);
TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
wrapper.setEntityDataCmds(Collections.singletonList(cmd));
getWsClient().send(mapper.writeValueAsString(wrapper));
String msg = getWsClient().waitForReply();
EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
EntityDataUpdate update = getWsClient().sendEntityDataQuery(edq);
Assert.assertEquals(1, update.getCmdId());
PageData<EntityData> pageData = update.getData();
Assert.assertNotNull(pageData);
@ -341,17 +252,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
List<TsKvEntry> tsData = Arrays.asList(dataPoint1);
sendTelemetry(device, tsData);
Thread.sleep(100);
LatestValueCmd latestCmd = new LatestValueCmd();
latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "temperature")));
cmd = new EntityDataCmd(1, null, null, latestCmd, null);
wrapper = new TelemetryPluginCmdsWrapper();
wrapper.setEntityDataCmds(Collections.singletonList(cmd));
getWsClient().send(mapper.writeValueAsString(wrapper));
msg = getWsClient().waitForReply();
update = mapper.readValue(msg, EntityDataUpdate.class);
update = getWsClient().subscribeLatestUpdate(keys);
Assert.assertEquals(1, update.getCmdId());
@ -368,9 +269,8 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
getWsClient().registerWaitForUpdate();
sendTelemetry(device, Arrays.asList(dataPoint2));
msg = getWsClient().waitForUpdate();
update = mapper.readValue(msg, EntityDataUpdate.class);
update = getWsClient().parseDataReply(getWsClient().waitForUpdate());
Assert.assertEquals(1, update.getCmdId());
List<EntityData> eData = update.getUpdate();
Assert.assertNotNull(eData);
@ -383,7 +283,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
//Sending update from the past, while latest value has new timestamp;
getWsClient().registerWaitForUpdate();
sendTelemetry(device, Arrays.asList(dataPoint1));
msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1));
String msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1));
Assert.assertNull(msg);
//Sending duplicate update again
@ -395,29 +295,11 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
@Test
public void testEntityDataLatestTsWsCmd() throws Exception {
Device device = new Device();
device.setName("Device");
device.setType("default");
device.setLabel("testLabel" + (int) (Math.random() * 1000));
device = doPost("/api/device", device, Device.class);
long now = System.currentTimeMillis();
List<EntityKey> keys = List.of(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"));
DeviceTypeFilter dtf = new DeviceTypeFilter();
dtf.setDeviceNameFilter("D");
dtf.setDeviceType("default");
EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
EntityDataUpdate update = getWsClient().subscribeLatestUpdate(keys, dtf);
LatestValueCmd latestCmd = new LatestValueCmd();
latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "temperature")));
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
wrapper.setEntityDataCmds(Collections.singletonList(cmd));
getWsClient().send(mapper.writeValueAsString(wrapper));
String msg = getWsClient().waitForReply();
EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
Assert.assertEquals(1, update.getCmdId());
PageData<EntityData> pageData = update.getData();
Assert.assertNotNull(pageData);
@ -433,13 +315,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
Thread.sleep(100);
cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
wrapper = new TelemetryPluginCmdsWrapper();
wrapper.setEntityDataCmds(Collections.singletonList(cmd));
getWsClient().send(mapper.writeValueAsString(wrapper));
msg = getWsClient().waitForReply();
update = mapper.readValue(msg, EntityDataUpdate.class);
update = getWsClient().subscribeLatestUpdate(keys, dtf);
Assert.assertEquals(1, update.getCmdId());
@ -456,9 +332,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
getWsClient().registerWaitForUpdate();
sendTelemetry(device, Arrays.asList(dataPoint2));
msg = getWsClient().waitForUpdate();
update = mapper.readValue(msg, EntityDataUpdate.class);
update = getWsClient().parseDataReply(getWsClient().waitForUpdate());
Assert.assertEquals(1, update.getCmdId());
List<EntityData> eData = update.getUpdate();
Assert.assertNotNull(eData);
@ -471,7 +345,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
//Sending update from the past, while latest value has new timestamp;
getWsClient().registerWaitForUpdate();
sendTelemetry(device, Arrays.asList(dataPoint1));
msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1));
String msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1));
Assert.assertNull(msg);
//Sending duplicate update again
@ -483,31 +357,10 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
@Test
public void testEntityDataLatestAttrWsCmd() throws Exception {
Device device = new Device();
device.setName("Device");
device.setType("default");
device.setLabel("testLabel" + (int) (Math.random() * 1000));
device = doPost("/api/device", device, Device.class);
long now = System.currentTimeMillis();
List<EntityKey> keys = List.of(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "serverAttributeKey"));
DeviceTypeFilter dtf = new DeviceTypeFilter();
dtf.setDeviceNameFilter("D");
dtf.setDeviceType("default");
EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
LatestValueCmd latestCmd = new LatestValueCmd();
latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "serverAttributeKey")));
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
wrapper.setEntityDataCmds(Collections.singletonList(cmd));
getWsClient().send(mapper.writeValueAsString(wrapper));
String msg = getWsClient().waitForReply();
Assert.assertNotNull(msg);
EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
EntityDataUpdate update = getWsClient().subscribeLatestUpdate(keys, dtf);
Assert.assertEquals(1, update.getCmdId());
PageData<EntityData> pageData = update.getData();
Assert.assertNotNull(pageData);
@ -517,7 +370,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getTs());
Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getValue());
getWsClient().registerWaitForUpdate();
Thread.sleep(500);
@ -525,7 +377,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
List<AttributeKvEntry> tsData = Arrays.asList(dataPoint1);
sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, tsData);
msg = getWsClient().waitForUpdate();
String msg = getWsClient().waitForUpdate();
Assert.assertNotNull(msg);
update = mapper.readValue(msg, EntityDataUpdate.class);
@ -574,35 +426,15 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
@Test
public void testEntityDataLatestAttrTypesWsCmd() throws Exception {
Device device = new Device();
device.setName("Device");
device.setType("default");
device.setLabel("testLabel" + (int) (Math.random() * 1000));
device = doPost("/api/device", device, Device.class);
long now = System.currentTimeMillis();
List<EntityKey> keys = List.of(
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "serverAttributeKey"),
new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, "clientAttributeKey"),
new EntityKey(EntityKeyType.SHARED_ATTRIBUTE, "sharedAttributeKey"),
new EntityKey(EntityKeyType.ATTRIBUTE, "anyAttributeKey"));
DeviceTypeFilter dtf = new DeviceTypeFilter();
dtf.setDeviceNameFilter("D");
dtf.setDeviceType("default");
EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
EntityDataUpdate update = getWsClient().subscribeLatestUpdate(keys, dtf);
LatestValueCmd latestCmd = new LatestValueCmd();
List<EntityKey> keys = new ArrayList<>();
keys.add(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "serverAttributeKey"));
keys.add(new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, "clientAttributeKey"));
keys.add(new EntityKey(EntityKeyType.SHARED_ATTRIBUTE, "sharedAttributeKey"));
keys.add(new EntityKey(EntityKeyType.ATTRIBUTE, "anyAttributeKey"));
latestCmd.setKeys(keys);
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
wrapper.setEntityDataCmds(Collections.singletonList(cmd));
getWsClient().send(mapper.writeValueAsString(wrapper));
String msg = getWsClient().waitForReply();
EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
Assert.assertEquals(1, update.getCmdId());
PageData<EntityData> pageData = update.getData();
Assert.assertNotNull(pageData);
@ -628,7 +460,7 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, tsData);
msg = getWsClient().waitForUpdate();
String msg = getWsClient().waitForUpdate();
Assert.assertNotNull(msg);
update = mapper.readValue(msg, EntityDataUpdate.class);
Assert.assertEquals(1, update.getCmdId());

View File

@ -18,9 +18,25 @@ package org.thingsboard.server.controller;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.query.EntityDataPageLink;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityFilter;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd;
import java.net.URI;
import java.nio.channels.NotYetConnectedException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -73,6 +89,18 @@ public class TbTestWebSocketClient extends WebSocketClient {
super.send(text);
}
public void send(EntityDataCmd cmd) throws NotYetConnectedException {
TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
wrapper.setEntityDataCmds(Collections.singletonList(cmd));
this.send(JacksonUtil.toString(wrapper));
}
public void send(EntityCountCmd cmd) throws NotYetConnectedException {
TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
wrapper.setEntityCountCmds(Collections.singletonList(cmd));
this.send(JacksonUtil.toString(wrapper));
}
public String waitForUpdate() {
return waitForUpdate(TimeUnit.SECONDS.toMillis(3));
}
@ -94,4 +122,89 @@ public class TbTestWebSocketClient extends WebSocketClient {
}
return lastMsg;
}
public EntityDataUpdate parseDataReply(String msg) {
return JacksonUtil.fromString(msg, EntityDataUpdate.class);
}
public EntityCountUpdate parseCountReply(String msg) {
return JacksonUtil.fromString(msg, EntityCountUpdate.class);
}
public EntityDataUpdate subscribeLatestUpdate(List<EntityKey> keys, EntityFilter entityFilter) {
EntityDataQuery edq = new EntityDataQuery(entityFilter, new EntityDataPageLink(1, 0, null, null),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
return subscribeLatestUpdate(keys, edq);
}
public EntityDataUpdate subscribeLatestUpdate(List<EntityKey> keys) {
return subscribeLatestUpdate(keys, (EntityDataQuery) null);
}
public EntityDataUpdate subscribeLatestUpdate(List<EntityKey> keys, EntityDataQuery edq) {
LatestValueCmd latestCmd = new LatestValueCmd();
latestCmd.setKeys(keys);
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
send(cmd);
return parseDataReply(waitForReply());
}
public EntityDataUpdate subscribeTsUpdate(List<String> keys, long startTs, long timeWindow) {
return subscribeTsUpdate(keys, startTs, timeWindow, null);
}
public EntityDataUpdate subscribeTsUpdate(List<String> keys, long startTs, long timeWindow, EntityDataQuery edq) {
TimeSeriesCmd tsCmd = new TimeSeriesCmd();
tsCmd.setKeys(keys);
tsCmd.setAgg(Aggregation.NONE);
tsCmd.setLimit(1000);
tsCmd.setStartTs(startTs - timeWindow);
tsCmd.setTimeWindow(timeWindow);
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, null, tsCmd);
send(cmd);
return parseDataReply(waitForReply());
}
public EntityDataUpdate sendHistoryCmd(List<String> keys, long startTs, long timeWindow) {
return sendHistoryCmd(keys, startTs, timeWindow, (EntityDataQuery) null);
}
public EntityDataUpdate sendHistoryCmd(List<String> keys, long startTs, long timeWindow, EntityFilter entityFilter) {
EntityDataQuery edq = new EntityDataQuery(entityFilter,
new EntityDataPageLink(1, 0, null, null),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
return sendHistoryCmd(keys, startTs, timeWindow, edq);
}
public EntityDataUpdate sendHistoryCmd(List<String> keys, long startTs, long timeWindow, EntityDataQuery edq) {
EntityHistoryCmd historyCmd = new EntityHistoryCmd();
historyCmd.setKeys(keys);
historyCmd.setAgg(Aggregation.NONE);
historyCmd.setLimit(1000);
historyCmd.setStartTs(startTs - timeWindow);
historyCmd.setEndTs(startTs);
EntityDataCmd cmd = new EntityDataCmd(1, edq, historyCmd, null, null);
send(cmd);
return parseDataReply(this.waitForReply());
}
public EntityDataUpdate sendEntityDataQuery(EntityDataQuery edq) {
log.warn("sendEntityDataQuery {}", edq);
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, null, null);
send(cmd);
String msg = this.waitForReply();
return parseDataReply(msg);
}
public EntityDataUpdate sendEntityDataQuery(EntityFilter entityFilter) {
log.warn("sendEntityDataQuery {}", entityFilter);
EntityDataQuery edq = new EntityDataQuery(entityFilter, new EntityDataPageLink(1, 0, null, null),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
return sendEntityDataQuery(edq);
}
}