diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index a9277a50e8..2252211bd1 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -171,11 +171,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId()); if (ctx != null) { log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd); - if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null) { + if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null || cmd.getHistoryCmd() != null) { Collection oldSubIds = ctx.clearSubscriptions(); oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId)); } - //TODO: cleanup old subscription; } else { log.debug("[{}][{}] Creating new subscription using: {}", session.getSessionId(), cmd.getCmdId(), cmd); ctx = createSubCtx(session, cmd); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index 47f0885ccf..607c91afeb 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -37,10 +37,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.BiConsumer; +import java.util.Optional; @Slf4j @Data @@ -145,12 +146,7 @@ public class TbEntityDataSubCtx { .subscriptionId(subIdx) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityData.getEntityId()) - .updateConsumer(new BiConsumer() { - @Override - public void accept(String sessionId, SubscriptionUpdate subscriptionUpdate) { - sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES, resultToLatestValues); - } - }) + .updateConsumer((sessionId, subscriptionUpdate) -> sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES, resultToLatestValues)) .allKeys(false) .keyStates(keyStates) .build(); @@ -179,50 +175,93 @@ public class TbEntityDataSubCtx { EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId()); if (entityId != null) { log.trace("[{}][{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate); - Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((k, v) -> { - Object[] data = (Object[]) v.get(0); - latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1])); - }); - EntityData entityData = getDataForEntity(entityId); - if (entityData != null && entityData.getLatest() != null) { - Map latestCtxValues = entityData.getLatest().get(keyType); - log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues); - if (latestCtxValues != null) { - latestCtxValues.forEach((k, v) -> { - TsValue update = latestUpdate.get(k); - if (update != null) { - if (update.getTs() < v.getTs()) { - log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); - latestUpdate.remove(k); - } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { - log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); - latestUpdate.remove(k); - } - } - }); - //Setting new values - latestUpdate.forEach(latestCtxValues::put); - } - } - if (!latestUpdate.isEmpty()) { - if (resultToLatestValues) { - Map> latestMap = Collections.singletonMap(keyType, latestUpdate); - entityData = new EntityData(entityId, latestMap, null); - } else { - Map tsMap = new HashMap<>(); - latestUpdate.forEach((key, tsValue) -> { - tsMap.put(key, new TsValue[]{tsValue}); - }); - entityData = new EntityData(entityId, null, tsMap); - } - wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData))); + if (resultToLatestValues) { + sendLatestWsMsg(entityId, sessionId, subscriptionUpdate, keyType); + } else { + sendTsWsMsg(entityId, sessionId, subscriptionUpdate, keyType); } } else { log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate); } } + private void sendLatestWsMsg(EntityId entityId, String sessionId, SubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { + Map latestUpdate = new HashMap<>(); + subscriptionUpdate.getData().forEach((k, v) -> { + Object[] data = (Object[]) v.get(0); + latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1])); + }); + EntityData entityData = getDataForEntity(entityId); + if (entityData != null && entityData.getLatest() != null) { + Map latestCtxValues = entityData.getLatest().get(keyType); + log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues); + if (latestCtxValues != null) { + latestCtxValues.forEach((k, v) -> { + TsValue update = latestUpdate.get(k); + if (update != null) { + if (update.getTs() < v.getTs()) { + log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + latestUpdate.remove(k); + } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { + log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + latestUpdate.remove(k); + } + } + }); + //Setting new values + latestUpdate.forEach(latestCtxValues::put); + } + } + if (!latestUpdate.isEmpty()) { + Map> latestMap = Collections.singletonMap(keyType, latestUpdate); + entityData = new EntityData(entityId, latestMap, null); + wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData))); + } + } + + private void sendTsWsMsg(EntityId entityId, String sessionId, SubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { + Map> tsUpdate = new HashMap<>(); + subscriptionUpdate.getData().forEach((k, v) -> { + Object[] data = (Object[]) v.get(0); + tsUpdate.computeIfAbsent(k, key -> new ArrayList<>()).add(new TsValue((Long) data[0], (String) data[1])); + }); + EntityData entityData = getDataForEntity(entityId); + if (entityData != null && entityData.getLatest() != null) { + Map latestCtxValues = entityData.getLatest().get(keyType); + log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues); + if (latestCtxValues != null) { + latestCtxValues.forEach((k, v) -> { + List updateList = tsUpdate.get(k); + if (updateList != null) { + for (TsValue update : new ArrayList<>(updateList)) { + if (update.getTs() < v.getTs()) { + log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + updateList.remove(update); + } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { + log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + updateList.remove(update); + } + if (updateList.isEmpty()) { + tsUpdate.remove(k); + } + } + } + }); + //Setting new values + tsUpdate.forEach((k, v) -> { + Optional maxValue = v.stream().max(Comparator.comparingLong(TsValue::getTs)); + maxValue.ifPresent(max -> latestCtxValues.put(k, max)); + }); + } + } + if (!tsUpdate.isEmpty()) { + Map tsMap = new HashMap<>(); + tsUpdate.forEach((key, tsValue) -> tsMap.put(key, tsValue.toArray(new TsValue[tsValue.size()]))); + entityData = new EntityData(entityId, null, tsMap); + wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData))); + } + } + private EntityData getDataForEntity(EntityId entityId) { return data.getData().stream().filter(item -> item.getEntityId().equals(entityId)).findFirst().orElse(null); } diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java index ded943b523..20c2bac13d 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java @@ -21,6 +21,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.server.common.data.Device; @@ -113,7 +114,9 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { DeviceTypeFilter dtf = new DeviceTypeFilter(); dtf.setDeviceNameFilter("D"); dtf.setDeviceType("default"); - EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + EntityDataQuery edq = new EntityDataQuery(dtf, + new EntityDataPageLink(1, 0, null, null), + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); EntityHistoryCmd historyCmd = new EntityHistoryCmd(); historyCmd.setKeys(Arrays.asList("temperature")); @@ -148,11 +151,11 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { msg = wsClient.waitForReply(); update = mapper.readValue(msg, EntityDataUpdate.class); Assert.assertEquals(1, update.getCmdId()); - pageData = update.getData(); - Assert.assertNotNull(pageData); - Assert.assertEquals(1, pageData.getData().size()); - Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId()); - TsValue[] tsArray = pageData.getData().get(0).getTimeseries().get("temperature"); + List dataList = update.getUpdate(); + Assert.assertNotNull(dataList); + Assert.assertEquals(1, dataList.size()); + Assert.assertEquals(device.getId(), dataList.get(0).getEntityId()); + TsValue[] tsArray = dataList.get(0).getTimeseries().get("temperature"); Assert.assertEquals(3, tsArray.length); Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsArray[0]); Assert.assertEquals(new TsValue(dataPoint2.getTs(), dataPoint2.getValueAsString()), tsArray[1]); @@ -223,8 +226,8 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { now = System.currentTimeMillis(); TsKvEntry dataPoint4 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 45L)); - wsClient.registerWaitForUpdate(); + Thread.sleep(100); sendTelemetry(device, Arrays.asList(dataPoint4)); msg = wsClient.waitForUpdate(); diff --git a/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java index 0a5dae47b7..15da972cf5 100644 --- a/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java @@ -26,9 +26,9 @@ import java.util.Arrays; @RunWith(ClasspathSuite.class) @ClasspathSuite.ClassnameFilters({ - "org.thingsboard.server.controller.sql.WebsocketApiSqlTest", +// "org.thingsboard.server.controller.sql.WebsocketApiSqlTest", // "org.thingsboard.server.controller.sql.EntityQueryControllerSqlTest", -// "org.thingsboard.server.controller.sql.*Test", + "org.thingsboard.server.controller.sql.*Test", }) public class ControllerSqlTestSuite { diff --git a/application/src/test/resources/logback.xml b/application/src/test/resources/logback.xml index a9e244f1e9..f991a40078 100644 --- a/application/src/test/resources/logback.xml +++ b/application/src/test/resources/logback.xml @@ -7,7 +7,7 @@ - +