diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractEntityQuerySubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractEntityQuerySubCtx.java index 39e486e0a8..2c3127bc4d 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractEntityQuerySubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractEntityQuerySubCtx.java @@ -44,6 +44,7 @@ import org.thingsboard.server.service.ws.WebSocketSessionRef; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -154,9 +155,8 @@ public abstract class TbAbstractEntityQuerySubCtx ex private void dynamicValueSubUpdate(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, Map dynamicValueKeySubMap) { Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((k, v) -> { - Object[] data = (Object[]) v.get(0); - latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1])); + subscriptionUpdate.getData().forEach((key, values) -> { + latestUpdate.put(key, getLatest(values)); }); boolean invalidateFilter = false; @@ -283,6 +283,12 @@ public abstract class TbAbstractEntityQuerySubCtx ex } } + protected TsValue getLatest(List values) { + return values.stream() + .max(Comparator.comparing(TsValue::getTs)) + .orElse(null); + } + @Data public static class DynamicValueKey { @Getter diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java index f6b4067543..b9956288a4 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java @@ -38,7 +38,6 @@ import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entity.EntityService; -import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.sql.query.EntityKeyMapping; import org.thingsboard.server.service.ws.WebSocketService; import org.thingsboard.server.service.ws.WebSocketSessionRef; @@ -191,9 +190,8 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId()); if (entityId != null) { Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((k, v) -> { - Object[] data = (Object[]) v.get(0); - latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1])); + subscriptionUpdate.getData().forEach((key, values) -> { + latestUpdate.put(key, getLatest(values)); }); EntityData entityData = entitiesMap.get(entityId); entityData.getLatest().computeIfAbsent(keyType, tmp -> new HashMap<>()).putAll(latestUpdate); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index 883b5307d2..6e3ebdc13b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -43,7 +43,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -94,16 +93,12 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { private void sendLatestWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((key, data) -> data.stream() - .filter(o -> o instanceof Object[] && ((Object[]) o).length > 0 && ((Object[]) o)[0] instanceof Long) - .max(Comparator.comparingLong(o -> (Long) ((Object[]) o)[0])) - .ifPresent(max -> { - Object[] arr = (Object[]) max; - latestUpdate.put(key, new TsValue((Long) arr[0], (String) arr[1])); - })); + subscriptionUpdate.getData().forEach((key, values) -> { + latestUpdate.put(key, getLatest(values)); + }); EntityData entityData = getDataForEntity(entityId); if (entityData != null && entityData.getLatest() != null) { - Map latestCtxValues = entityData.getLatest().computeIfAbsent(keyType, key -> new HashMap<>()); + Map latestCtxValues = entityData.getLatest().computeIfAbsent(keyType, __ -> new HashMap<>()); log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues); latestCtxValues.forEach((k, v) -> { TsValue update = latestUpdate.get(k); @@ -134,40 +129,39 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { private void sendTsWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { Map> tsUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((k, v) -> { - Object[] data = (Object[]) v.get(0); - tsUpdate.computeIfAbsent(k, key -> new ArrayList<>()).add(new TsValue((Long) data[0], (String) data[1])); + subscriptionUpdate.getData().forEach((key, values) -> { + tsUpdate.put(key, new ArrayList<>(values)); }); Map latestCtxValues = getLatestTsValuesForEntity(entityId); log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues); if (latestCtxValues != null) { - latestCtxValues.forEach((k, v) -> { - List updateList = tsUpdate.get(k); + latestCtxValues.forEach((key, latest) -> { + List updateList = tsUpdate.get(key); if (updateList != null) { for (TsValue update : new ArrayList<>(updateList)) { - if (update.getTs() < v.getTs()) { - log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + if (update.getTs() < latest.getTs()) { + log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), key, update.getTs()); // Looks like this is redundant feature and our UI is ready to merge the updates. //updateList.remove(update); - } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { - log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + } else if ((update.getTs() == latest.getTs() && update.getValue().equals(latest.getValue()))) { + log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), key, update.getTs()); updateList.remove(update); } if (updateList.isEmpty()) { - tsUpdate.remove(k); + tsUpdate.remove(key); } } } }); //Setting new values - tsUpdate.forEach((k, v) -> { - Optional maxValue = v.stream().max(Comparator.comparingLong(TsValue::getTs)); - maxValue.ifPresent(max -> latestCtxValues.put(k, max)); + tsUpdate.forEach((key, values) -> { + values.stream().max(Comparator.comparingLong(TsValue::getTs)) + .ifPresent(latest -> latestCtxValues.put(key, latest)); }); } if (!tsUpdate.isEmpty()) { Map tsMap = new HashMap<>(); - tsUpdate.forEach((key, tsValue) -> tsMap.put(key, tsValue.toArray(new TsValue[tsValue.size()]))); + tsUpdate.forEach((key, values) -> tsMap.put(key, values.toArray(new TsValue[0]))); EntityData entityData = new EntityData(entityId, null, tsMap); sendWsMsg(new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData), maxEntitiesPerDataSubscription)); } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 2e4fe9730a..cbe6663663 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -274,7 +274,7 @@ public class DefaultWebSocketService implements WebSocketService { @Override public void sendUpdate(String sessionId, int cmdId, TelemetrySubscriptionUpdate update) { // We substitute the subscriptionId with cmdId for old-style subscriptions. - doSendUpdate(sessionId, cmdId, update.copyWithNewSubscriptionId(cmdId)); + doSendUpdate(sessionId, cmdId, update.withSubscriptionId(cmdId)); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java index b22b021a03..6fca1a159a 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java @@ -16,12 +16,16 @@ package org.thingsboard.server.service.ws.telemetry.sub; import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.With; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.service.subscription.SubscriptionErrorCode; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -29,8 +33,13 @@ import java.util.stream.Collectors; @AllArgsConstructor public class TelemetrySubscriptionUpdate { + + @Getter + @With private final int subscriptionId; + @Getter private int errorCode; + @Getter private String errorMsg; private Map> data; @@ -66,11 +75,27 @@ public class TelemetrySubscriptionUpdate { this.errorMsg = errorMsg != null ? errorMsg : errorCode.getDefaultMsg(); } - public int getSubscriptionId() { - return subscriptionId; - } + public Map> getData() { + if (data == null || data.isEmpty()) { + return Collections.emptyMap(); + } - public Map> getData() { + Map> data = new HashMap<>(); + this.data.forEach((key, entries) -> { + if (entries.isEmpty()) { + return; + } + + List values = new ArrayList<>(entries.size()); + entries.forEach(object -> { + if (!(object instanceof Object[] entry) || entry.length < 2) { + return; + } + TsValue tsValue = new TsValue((Long) entry[0], (String) entry[1]); + values.add(tsValue); + }); + data.put(key, values); + }); return data; } @@ -86,28 +111,17 @@ public class TelemetrySubscriptionUpdate { } } - public int getErrorCode() { - return errorCode; - } - - public String getErrorMsg() { - return errorMsg; - } - - public TelemetrySubscriptionUpdate copyWithNewSubscriptionId(int subscriptionId){ - return new TelemetrySubscriptionUpdate(subscriptionId, errorCode, errorMsg, data); - } - @Override public String toString() { StringBuilder result = new StringBuilder("TelemetrySubscriptionUpdate [subscriptionId=" + subscriptionId + ", errorCode=" + errorCode + ", errorMsg=" + errorMsg + ", data="); data.forEach((k, v) -> { result.append(k).append("=["); - for(Object a : v){ - result.append(Arrays.toString((Object[])a)).append("|"); + for (Object a : v) { + result.append(Arrays.toString((Object[]) a)).append("|"); } result.append("]"); }); return result.toString(); } + }