From 91c5a0514051dcbfc9dfb63e15860e373e14bb78 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 7 May 2025 10:15:56 +0300 Subject: [PATCH 1/4] fixed telemetry updates with old timestamps --- .../DefaultTbLocalSubscriptionService.java | 4 +- .../subscription/TbEntityDataSubCtx.java | 49 +++---- ...DefaultTbLocalSubscriptionServiceTest.java | 73 ++++++++-- .../subscription/TbEntityDataSubCtxTest.java | 130 ++++++++++++++++++ 4 files changed, 216 insertions(+), 40 deletions(-) create mode 100644 application/src/test/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtxTest.java diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 5716976b6f..eed68b0c5e 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -348,7 +348,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer if (sub.isLatestValues()) { for (TsKvEntry kv : data) { Long stateTs = keyStates.get(kv.getKey()); - if (stateTs == null || kv.getTs() > stateTs) { + if (stateTs == null || kv.getTs() >= stateTs) { if (updateData == null) { updateData = new ArrayList<>(); } @@ -362,7 +362,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer for (TsKvEntry kv : data) { Long stateTs = keyStates.get(kv.getKey()); if (stateTs != null) { - if (!sub.isLatestValues() || kv.getTs() > stateTs) { + if (!sub.isLatestValues() || kv.getTs() >= stateTs) { if (updateData == null) { updateData = new ArrayList<>(); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index d397bbc05b..883b5307d2 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -94,35 +94,36 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { private void sendLatestWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((k, v) -> { - Object[] data = (Object[]) v.get(0); - latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1])); - }); + subscriptionUpdate.getData().forEach((key, data) -> data.stream() + .filter(o -> o instanceof Object[] && ((Object[]) o).length > 0 && ((Object[]) o)[0] instanceof Long) + .max(Comparator.comparingLong(o -> (Long) ((Object[]) o)[0])) + .ifPresent(max -> { + Object[] arr = (Object[]) max; + latestUpdate.put(key, new TsValue((Long) arr[0], (String) arr[1])); + })); EntityData entityData = getDataForEntity(entityId); if (entityData != null && entityData.getLatest() != null) { - Map latestCtxValues = entityData.getLatest().get(keyType); + Map latestCtxValues = entityData.getLatest().computeIfAbsent(keyType, key -> new HashMap<>()); log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues); - if (latestCtxValues != null) { - latestCtxValues.forEach((k, v) -> { - TsValue update = latestUpdate.get(k); - if (update != null) { - //Ignore notifications about deleted keys - if (!(update.getTs() == 0 && (update.getValue() == null || update.getValue().isEmpty()))) { - if (update.getTs() < v.getTs()) { - log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); - latestUpdate.remove(k); - } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { - log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); - latestUpdate.remove(k); - } - } else { - log.trace("[{}][{}][{}] Received deleted notification for: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k); + latestCtxValues.forEach((k, v) -> { + TsValue update = latestUpdate.get(k); + if (update != null) { + //Ignore notifications about deleted keys + if (!(update.getTs() == 0 && (update.getValue() == null || update.getValue().isEmpty()))) { + if (update.getTs() < v.getTs()) { + log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + latestUpdate.remove(k); + } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { + log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + latestUpdate.remove(k); } + } else { + log.trace("[{}][{}][{}] Received deleted notification for: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k); } - }); - //Setting new values - latestUpdate.forEach(latestCtxValues::put); - } + } + }); + //Setting new values + latestCtxValues.putAll(latestUpdate); } if (!latestUpdate.isEmpty()) { Map> latestMap = Collections.singletonMap(keyType, latestUpdate); diff --git a/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java index 2630f95f9b..623dd6b9e2 100644 --- a/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java @@ -32,11 +32,16 @@ import org.thingsboard.server.cache.limits.RateLimitService; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.limit.LimitedApi; +import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.service.ws.WebSocketSessionRef; +import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import java.util.ArrayList; import java.util.HashMap; @@ -44,8 +49,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; @@ -54,11 +62,17 @@ import static org.mockito.Mockito.when; public class DefaultTbLocalSubscriptionServiceTest { - ListAppender testLogAppender; - TbLocalSubscriptionService subscriptionService; + private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + private final DeviceId deviceId = new DeviceId(UUID.randomUUID()); + + private ListAppender testLogAppender; + private TbLocalSubscriptionService subscriptionService; + private ListeningExecutorService executorService; @BeforeEach public void setUp() throws Exception { + this.executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); + Logger logger = (Logger) LoggerFactory.getLogger(DefaultTbLocalSubscriptionService.class); testLogAppender = new ListAppender<>(); testLogAppender.start(); @@ -69,7 +83,9 @@ public class DefaultTbLocalSubscriptionServiceTest { PartitionService partitionService = mock(); when(partitionService.resolve(any(), any(), any())).thenReturn(TopicPartitionInfo.builder().build()); subscriptionService = new DefaultTbLocalSubscriptionService(mock(), mock(), mock(), partitionService, mock(), mock(), mock(), rateLimitService); + ReflectionTestUtils.setField(subscriptionService, "serviceId", "serviceId"); + ReflectionTestUtils.setField(subscriptionService, "subscriptionUpdateExecutor", executorService); } @AfterEach @@ -79,27 +95,22 @@ public class DefaultTbLocalSubscriptionServiceTest { Logger logger = (Logger) LoggerFactory.getLogger(DefaultTbLocalSubscriptionService.class); logger.detachAppender(testLogAppender); } + if (executorService != null) { + executorService.shutdownNow(); + } } @Test public void addSubscriptionConcurrentModificationTest() throws Exception { - ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); - TenantId tenantId = new TenantId(UUID.randomUUID()); - DeviceId deviceId = new DeviceId(UUID.randomUUID()); WebSocketSessionRef sessionRef = mock(); - ReflectionTestUtils.setField(subscriptionService, "subscriptionUpdateExecutor", executorService); List> futures = new ArrayList<>(); - try { - subscriptionService.onCoreStartupMsg(TransportProtos.CoreStartupMsg.newBuilder().addAllPartitions(List.of(0)).getDefaultInstanceForType()); - for (int i = 0; i < 50; i++) { - futures.add(executorService.submit(() -> subscriptionService.addSubscription(createSubscription(tenantId, deviceId), sessionRef))); - } - Futures.allAsList(futures).get(); - } finally { - executorService.shutdownNow(); + subscriptionService.onCoreStartupMsg(TransportProtos.CoreStartupMsg.newBuilder().addAllPartitions(List.of(0)).getDefaultInstanceForType()); + for (int i = 0; i < 50; i++) { + futures.add(executorService.submit(() -> subscriptionService.addSubscription(createSubscription(tenantId, deviceId), sessionRef))); } + Futures.allAsList(futures).get(); List logs = testLogAppender.list; boolean exceptionLogged = logs.stream() @@ -123,4 +134,38 @@ public class DefaultTbLocalSubscriptionServiceTest { .keyStates(keys) .build(); } + + @Test + public void updateOldTelemetryTest() { + WebSocketSessionRef sessionRef = mock(); + + String key = "temperature"; + long ts = 1237465456L; + + Map keyStates = new HashMap<>(); + keyStates.put(key, ts); + + AtomicReference capturedUpdate = new AtomicReference<>(); + TbTimeSeriesSubscription tsSubscription = TbTimeSeriesSubscription.builder() + .tenantId(tenantId) + .entityId(deviceId) + .subscriptionId(2) + .sessionId(RandomStringUtils.randomAlphanumeric(5)) + .keyStates(keyStates) + .allKeys(true) + .latestValues(true) + .updateProcessor((subscription, update) -> capturedUpdate.set(update)) + .build(); + + subscriptionService.addSubscription(tsSubscription, sessionRef); + + // Send telemetry with ts == stateTs + TsKvEntry kv = new BasicTsKvEntry(ts, new StringDataEntry(key, "42")); + subscriptionService.onTimeSeriesUpdate(deviceId, List.of(kv), mock(TbCallback.class)); + + TelemetrySubscriptionUpdate update = capturedUpdate.get(); + assertNotNull(update); + assertTrue(update.getLatestValues().containsKey(key)); + } + } diff --git a/application/src/test/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtxTest.java b/application/src/test/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtxTest.java new file mode 100644 index 0000000000..1f0da1a30c --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtxTest.java @@ -0,0 +1,130 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.query.EntityData; +import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.common.data.query.TsValue; +import org.thingsboard.server.service.ws.WebSocketService; +import org.thingsboard.server.service.ws.WebSocketSessionRef; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; +import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TbEntityDataSubCtxTest { + + private final DeviceId deviceId = new DeviceId(UUID.randomUUID()); + + private final Integer cmdId = RandomUtils.nextInt(); + private final Integer subscriptionId = RandomUtils.nextInt(); + private final String serviceId = RandomStringUtils.randomAlphanumeric(10); + private final String sessionId = RandomStringUtils.randomAlphanumeric(10); + + private final int maxEntitiesPerDataSubscription = 100; + + private TbEntityDataSubCtx subCtx; + @Mock + private WebSocketService webSocketService; + @Mock + private WebSocketSessionRef webSocketSessionRef; + + @BeforeEach + public void setUp() { + when(webSocketSessionRef.getSessionId()).thenReturn(sessionId); + subCtx = new TbEntityDataSubCtx(serviceId, webSocketService, mock(), mock(), mock(), mock(), webSocketSessionRef, cmdId, maxEntitiesPerDataSubscription); + + Map subToEntityIdMap = new HashMap<>(); + subToEntityIdMap.put(subscriptionId, deviceId); + ReflectionTestUtils.setField(subCtx, "subToEntityIdMap", subToEntityIdMap); + + long now = System.currentTimeMillis(); + long oldTs = now - 1_000_000; + + Map latestCtxValues = new HashMap<>(); + latestCtxValues.put("key", new TsValue(oldTs, "15")); + Map> latest = new HashMap<>(); + latest.put(EntityKeyType.TIME_SERIES, latestCtxValues); + + EntityData entityData = new EntityData(); + entityData.setEntityId(deviceId); + entityData.setLatest(latest); + + PageData data = new PageData<>(List.of(entityData), 1, 1, true); + ReflectionTestUtils.setField(subCtx, "data", data); + } + + @Test + public void testSendLatestWsMsg() { + long ts = System.currentTimeMillis(); + List telemetry = List.of( + new BasicTsKvEntry(ts - 50000, new LongDataEntry("key", 42L), 34L), + new BasicTsKvEntry(ts - 20000, new LongDataEntry("key", 17L), 78L) + ); + + TelemetrySubscriptionUpdate subUpdate = new TelemetrySubscriptionUpdate(subscriptionId, telemetry); + + subCtx.sendWsMsg(sessionId, subUpdate, EntityKeyType.TIME_SERIES, true); + + Map> expectedLatest = new HashMap<>(); + Map expectedLatestCtxValues = new HashMap<>(); + expectedLatestCtxValues.put("key", new TsValue(ts - 20000, "17")); // use latest telemetry + expectedLatest.put(EntityKeyType.TIME_SERIES, expectedLatestCtxValues); + + EntityData expectedEntityData = new EntityData(); + expectedEntityData.setEntityId(deviceId); + expectedEntityData.setLatest(expectedLatest); + + List expected = List.of(expectedEntityData); + + ArgumentCaptor cmdUpdateCaptor = ArgumentCaptor.forClass(CmdUpdate.class); + then(webSocketService).should().sendUpdate(eq(sessionId), cmdUpdateCaptor.capture()); + CmdUpdate cmdUpdate = cmdUpdateCaptor.getValue(); + assertThat(cmdUpdate).isInstanceOf(EntityDataUpdate.class); + EntityDataUpdate entityDataUpdate = (EntityDataUpdate) cmdUpdate; + assertThat(entityDataUpdate.getCmdId()).isEqualTo(cmdId); + assertThat(entityDataUpdate.getData()).isNull(); + assertThat(entityDataUpdate.getUpdate()).isEqualTo(expected); + assertThat(entityDataUpdate.getAllowedEntities()).isEqualTo(maxEntitiesPerDataSubscription); + } + +} From c8b13fc3507645fb6d690d7b131708067b0b9891 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 7 May 2025 11:13:36 +0300 Subject: [PATCH 2/4] added await for test --- .../DefaultTbLocalSubscriptionServiceTest.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java index 623dd6b9e2..3bb470830b 100644 --- a/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java @@ -49,8 +49,10 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -163,9 +165,13 @@ public class DefaultTbLocalSubscriptionServiceTest { TsKvEntry kv = new BasicTsKvEntry(ts, new StringDataEntry(key, "42")); subscriptionService.onTimeSeriesUpdate(deviceId, List.of(kv), mock(TbCallback.class)); - TelemetrySubscriptionUpdate update = capturedUpdate.get(); - assertNotNull(update); - assertTrue(update.getLatestValues().containsKey(key)); + await().atMost(5, TimeUnit.SECONDS) + .pollInterval(5, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + TelemetrySubscriptionUpdate update = capturedUpdate.get(); + assertNotNull(update); + assertTrue(update.getLatestValues().containsKey(key)); + }); } } From 69c9c4616cb88a64308c942e9024c9204652e587 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 28 May 2025 20:24:48 +0300 Subject: [PATCH 3/4] Refactoring for TelemetrySubscriptionUpdate --- .../TbAbstractEntityQuerySubCtx.java | 12 +++-- .../subscription/TbAlarmDataSubCtx.java | 6 +-- .../subscription/TbEntityDataSubCtx.java | 40 +++++++-------- .../service/ws/DefaultWebSocketService.java | 2 +- .../sub/TelemetrySubscriptionUpdate.java | 50 ++++++++++++------- 5 files changed, 61 insertions(+), 49 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractEntityQuerySubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractEntityQuerySubCtx.java index 39e486e0a8..2c3127bc4d 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractEntityQuerySubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractEntityQuerySubCtx.java @@ -44,6 +44,7 @@ import org.thingsboard.server.service.ws.WebSocketSessionRef; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -154,9 +155,8 @@ public abstract class TbAbstractEntityQuerySubCtx ex private void dynamicValueSubUpdate(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, Map dynamicValueKeySubMap) { Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((k, v) -> { - Object[] data = (Object[]) v.get(0); - latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1])); + subscriptionUpdate.getData().forEach((key, values) -> { + latestUpdate.put(key, getLatest(values)); }); boolean invalidateFilter = false; @@ -283,6 +283,12 @@ public abstract class TbAbstractEntityQuerySubCtx ex } } + protected TsValue getLatest(List values) { + return values.stream() + .max(Comparator.comparing(TsValue::getTs)) + .orElse(null); + } + @Data public static class DynamicValueKey { @Getter diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java index f6b4067543..b9956288a4 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java @@ -38,7 +38,6 @@ import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entity.EntityService; -import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.sql.query.EntityKeyMapping; import org.thingsboard.server.service.ws.WebSocketService; import org.thingsboard.server.service.ws.WebSocketSessionRef; @@ -191,9 +190,8 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId()); if (entityId != null) { Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((k, v) -> { - Object[] data = (Object[]) v.get(0); - latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1])); + subscriptionUpdate.getData().forEach((key, values) -> { + latestUpdate.put(key, getLatest(values)); }); EntityData entityData = entitiesMap.get(entityId); entityData.getLatest().computeIfAbsent(keyType, tmp -> new HashMap<>()).putAll(latestUpdate); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index 883b5307d2..6e3ebdc13b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -43,7 +43,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -94,16 +93,12 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { private void sendLatestWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((key, data) -> data.stream() - .filter(o -> o instanceof Object[] && ((Object[]) o).length > 0 && ((Object[]) o)[0] instanceof Long) - .max(Comparator.comparingLong(o -> (Long) ((Object[]) o)[0])) - .ifPresent(max -> { - Object[] arr = (Object[]) max; - latestUpdate.put(key, new TsValue((Long) arr[0], (String) arr[1])); - })); + subscriptionUpdate.getData().forEach((key, values) -> { + latestUpdate.put(key, getLatest(values)); + }); EntityData entityData = getDataForEntity(entityId); if (entityData != null && entityData.getLatest() != null) { - Map latestCtxValues = entityData.getLatest().computeIfAbsent(keyType, key -> new HashMap<>()); + Map latestCtxValues = entityData.getLatest().computeIfAbsent(keyType, __ -> new HashMap<>()); log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues); latestCtxValues.forEach((k, v) -> { TsValue update = latestUpdate.get(k); @@ -134,40 +129,39 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { private void sendTsWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { Map> tsUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((k, v) -> { - Object[] data = (Object[]) v.get(0); - tsUpdate.computeIfAbsent(k, key -> new ArrayList<>()).add(new TsValue((Long) data[0], (String) data[1])); + subscriptionUpdate.getData().forEach((key, values) -> { + tsUpdate.put(key, new ArrayList<>(values)); }); Map latestCtxValues = getLatestTsValuesForEntity(entityId); log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues); if (latestCtxValues != null) { - latestCtxValues.forEach((k, v) -> { - List updateList = tsUpdate.get(k); + latestCtxValues.forEach((key, latest) -> { + List updateList = tsUpdate.get(key); if (updateList != null) { for (TsValue update : new ArrayList<>(updateList)) { - if (update.getTs() < v.getTs()) { - log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + if (update.getTs() < latest.getTs()) { + log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), key, update.getTs()); // Looks like this is redundant feature and our UI is ready to merge the updates. //updateList.remove(update); - } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { - log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + } else if ((update.getTs() == latest.getTs() && update.getValue().equals(latest.getValue()))) { + log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), key, update.getTs()); updateList.remove(update); } if (updateList.isEmpty()) { - tsUpdate.remove(k); + tsUpdate.remove(key); } } } }); //Setting new values - tsUpdate.forEach((k, v) -> { - Optional maxValue = v.stream().max(Comparator.comparingLong(TsValue::getTs)); - maxValue.ifPresent(max -> latestCtxValues.put(k, max)); + tsUpdate.forEach((key, values) -> { + values.stream().max(Comparator.comparingLong(TsValue::getTs)) + .ifPresent(latest -> latestCtxValues.put(key, latest)); }); } if (!tsUpdate.isEmpty()) { Map tsMap = new HashMap<>(); - tsUpdate.forEach((key, tsValue) -> tsMap.put(key, tsValue.toArray(new TsValue[tsValue.size()]))); + tsUpdate.forEach((key, values) -> tsMap.put(key, values.toArray(new TsValue[0]))); EntityData entityData = new EntityData(entityId, null, tsMap); sendWsMsg(new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData), maxEntitiesPerDataSubscription)); } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 2e4fe9730a..cbe6663663 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -274,7 +274,7 @@ public class DefaultWebSocketService implements WebSocketService { @Override public void sendUpdate(String sessionId, int cmdId, TelemetrySubscriptionUpdate update) { // We substitute the subscriptionId with cmdId for old-style subscriptions. - doSendUpdate(sessionId, cmdId, update.copyWithNewSubscriptionId(cmdId)); + doSendUpdate(sessionId, cmdId, update.withSubscriptionId(cmdId)); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java index b22b021a03..6fca1a159a 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java @@ -16,12 +16,16 @@ package org.thingsboard.server.service.ws.telemetry.sub; import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.With; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.service.subscription.SubscriptionErrorCode; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -29,8 +33,13 @@ import java.util.stream.Collectors; @AllArgsConstructor public class TelemetrySubscriptionUpdate { + + @Getter + @With private final int subscriptionId; + @Getter private int errorCode; + @Getter private String errorMsg; private Map> data; @@ -66,11 +75,27 @@ public class TelemetrySubscriptionUpdate { this.errorMsg = errorMsg != null ? errorMsg : errorCode.getDefaultMsg(); } - public int getSubscriptionId() { - return subscriptionId; - } + public Map> getData() { + if (data == null || data.isEmpty()) { + return Collections.emptyMap(); + } - public Map> getData() { + Map> data = new HashMap<>(); + this.data.forEach((key, entries) -> { + if (entries.isEmpty()) { + return; + } + + List values = new ArrayList<>(entries.size()); + entries.forEach(object -> { + if (!(object instanceof Object[] entry) || entry.length < 2) { + return; + } + TsValue tsValue = new TsValue((Long) entry[0], (String) entry[1]); + values.add(tsValue); + }); + data.put(key, values); + }); return data; } @@ -86,28 +111,17 @@ public class TelemetrySubscriptionUpdate { } } - public int getErrorCode() { - return errorCode; - } - - public String getErrorMsg() { - return errorMsg; - } - - public TelemetrySubscriptionUpdate copyWithNewSubscriptionId(int subscriptionId){ - return new TelemetrySubscriptionUpdate(subscriptionId, errorCode, errorMsg, data); - } - @Override public String toString() { StringBuilder result = new StringBuilder("TelemetrySubscriptionUpdate [subscriptionId=" + subscriptionId + ", errorCode=" + errorCode + ", errorMsg=" + errorMsg + ", data="); data.forEach((k, v) -> { result.append(k).append("=["); - for(Object a : v){ - result.append(Arrays.toString((Object[])a)).append("|"); + for (Object a : v) { + result.append(Arrays.toString((Object[]) a)).append("|"); } result.append("]"); }); return result.toString(); } + } From da6fa2c00b29638eb9906ab3922b561a1c098c61 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 29 May 2025 11:47:11 +0300 Subject: [PATCH 4/4] Refactor TelemetrySubscriptionUpdate --- .../service/subscription/TbAbstractEntityQuerySubCtx.java | 2 +- .../server/service/subscription/TbAlarmDataSubCtx.java | 2 +- .../server/service/subscription/TbEntityDataSubCtx.java | 4 ++-- .../ws/telemetry/sub/TelemetrySubscriptionUpdate.java | 6 ++---- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractEntityQuerySubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractEntityQuerySubCtx.java index 2c3127bc4d..9d51c2ec4a 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractEntityQuerySubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractEntityQuerySubCtx.java @@ -155,7 +155,7 @@ public abstract class TbAbstractEntityQuerySubCtx ex private void dynamicValueSubUpdate(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, Map dynamicValueKeySubMap) { Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((key, values) -> { + subscriptionUpdate.getValues().forEach((key, values) -> { latestUpdate.put(key, getLatest(values)); }); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java index b9956288a4..e896b520bc 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java @@ -190,7 +190,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId()); if (entityId != null) { Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((key, values) -> { + subscriptionUpdate.getValues().forEach((key, values) -> { latestUpdate.put(key, getLatest(values)); }); EntityData entityData = entitiesMap.get(entityId); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index 6e3ebdc13b..3df03283d7 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -93,7 +93,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { private void sendLatestWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((key, values) -> { + subscriptionUpdate.getValues().forEach((key, values) -> { latestUpdate.put(key, getLatest(values)); }); EntityData entityData = getDataForEntity(entityId); @@ -129,7 +129,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { private void sendTsWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { Map> tsUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((key, values) -> { + subscriptionUpdate.getValues().forEach((key, values) -> { tsUpdate.put(key, new ArrayList<>(values)); }); Map latestCtxValues = getLatestTsValuesForEntity(entityId); diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java index 6fca1a159a..e24ab368ec 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/sub/TelemetrySubscriptionUpdate.java @@ -31,15 +31,13 @@ import java.util.Map; import java.util.TreeMap; import java.util.stream.Collectors; +@Getter @AllArgsConstructor public class TelemetrySubscriptionUpdate { - @Getter @With private final int subscriptionId; - @Getter private int errorCode; - @Getter private String errorMsg; private Map> data; @@ -75,7 +73,7 @@ public class TelemetrySubscriptionUpdate { this.errorMsg = errorMsg != null ? errorMsg : errorCode.getDefaultMsg(); } - public Map> getData() { + public Map> getValues() { if (data == null || data.isEmpty()) { return Collections.emptyMap(); }