From 91c5a0514051dcbfc9dfb63e15860e373e14bb78 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 7 May 2025 10:15:56 +0300 Subject: [PATCH 1/2] fixed telemetry updates with old timestamps --- .../DefaultTbLocalSubscriptionService.java | 4 +- .../subscription/TbEntityDataSubCtx.java | 49 +++---- ...DefaultTbLocalSubscriptionServiceTest.java | 73 ++++++++-- .../subscription/TbEntityDataSubCtxTest.java | 130 ++++++++++++++++++ 4 files changed, 216 insertions(+), 40 deletions(-) create mode 100644 application/src/test/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtxTest.java diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 5716976b6f..eed68b0c5e 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -348,7 +348,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer if (sub.isLatestValues()) { for (TsKvEntry kv : data) { Long stateTs = keyStates.get(kv.getKey()); - if (stateTs == null || kv.getTs() > stateTs) { + if (stateTs == null || kv.getTs() >= stateTs) { if (updateData == null) { updateData = new ArrayList<>(); } @@ -362,7 +362,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer for (TsKvEntry kv : data) { Long stateTs = keyStates.get(kv.getKey()); if (stateTs != null) { - if (!sub.isLatestValues() || kv.getTs() > stateTs) { + if (!sub.isLatestValues() || kv.getTs() >= stateTs) { if (updateData == null) { updateData = new ArrayList<>(); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index d397bbc05b..883b5307d2 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -94,35 +94,36 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { private void sendLatestWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((k, v) -> { - Object[] data = (Object[]) v.get(0); - latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1])); - }); + subscriptionUpdate.getData().forEach((key, data) -> data.stream() + .filter(o -> o instanceof Object[] && ((Object[]) o).length > 0 && ((Object[]) o)[0] instanceof Long) + .max(Comparator.comparingLong(o -> (Long) ((Object[]) o)[0])) + .ifPresent(max -> { + Object[] arr = (Object[]) max; + latestUpdate.put(key, new TsValue((Long) arr[0], (String) arr[1])); + })); EntityData entityData = getDataForEntity(entityId); if (entityData != null && entityData.getLatest() != null) { - Map latestCtxValues = entityData.getLatest().get(keyType); + Map latestCtxValues = entityData.getLatest().computeIfAbsent(keyType, key -> new HashMap<>()); log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues); - if (latestCtxValues != null) { - latestCtxValues.forEach((k, v) -> { - TsValue update = latestUpdate.get(k); - if (update != null) { - //Ignore notifications about deleted keys - if (!(update.getTs() == 0 && (update.getValue() == null || update.getValue().isEmpty()))) { - if (update.getTs() < v.getTs()) { - log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); - latestUpdate.remove(k); - } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { - log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); - latestUpdate.remove(k); - } - } else { - log.trace("[{}][{}][{}] Received deleted notification for: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k); + latestCtxValues.forEach((k, v) -> { + TsValue update = latestUpdate.get(k); + if (update != null) { + //Ignore notifications about deleted keys + if (!(update.getTs() == 0 && (update.getValue() == null || update.getValue().isEmpty()))) { + if (update.getTs() < v.getTs()) { + log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + latestUpdate.remove(k); + } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { + log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + latestUpdate.remove(k); } + } else { + log.trace("[{}][{}][{}] Received deleted notification for: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k); } - }); - //Setting new values - latestUpdate.forEach(latestCtxValues::put); - } + } + }); + //Setting new values + latestCtxValues.putAll(latestUpdate); } if (!latestUpdate.isEmpty()) { Map> latestMap = Collections.singletonMap(keyType, latestUpdate); diff --git a/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java index 2630f95f9b..623dd6b9e2 100644 --- a/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java @@ -32,11 +32,16 @@ import org.thingsboard.server.cache.limits.RateLimitService; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.limit.LimitedApi; +import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.service.ws.WebSocketSessionRef; +import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import java.util.ArrayList; import java.util.HashMap; @@ -44,8 +49,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; @@ -54,11 +62,17 @@ import static org.mockito.Mockito.when; public class DefaultTbLocalSubscriptionServiceTest { - ListAppender testLogAppender; - TbLocalSubscriptionService subscriptionService; + private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + private final DeviceId deviceId = new DeviceId(UUID.randomUUID()); + + private ListAppender testLogAppender; + private TbLocalSubscriptionService subscriptionService; + private ListeningExecutorService executorService; @BeforeEach public void setUp() throws Exception { + this.executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); + Logger logger = (Logger) LoggerFactory.getLogger(DefaultTbLocalSubscriptionService.class); testLogAppender = new ListAppender<>(); testLogAppender.start(); @@ -69,7 +83,9 @@ public class DefaultTbLocalSubscriptionServiceTest { PartitionService partitionService = mock(); when(partitionService.resolve(any(), any(), any())).thenReturn(TopicPartitionInfo.builder().build()); subscriptionService = new DefaultTbLocalSubscriptionService(mock(), mock(), mock(), partitionService, mock(), mock(), mock(), rateLimitService); + ReflectionTestUtils.setField(subscriptionService, "serviceId", "serviceId"); + ReflectionTestUtils.setField(subscriptionService, "subscriptionUpdateExecutor", executorService); } @AfterEach @@ -79,27 +95,22 @@ public class DefaultTbLocalSubscriptionServiceTest { Logger logger = (Logger) LoggerFactory.getLogger(DefaultTbLocalSubscriptionService.class); logger.detachAppender(testLogAppender); } + if (executorService != null) { + executorService.shutdownNow(); + } } @Test public void addSubscriptionConcurrentModificationTest() throws Exception { - ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); - TenantId tenantId = new TenantId(UUID.randomUUID()); - DeviceId deviceId = new DeviceId(UUID.randomUUID()); WebSocketSessionRef sessionRef = mock(); - ReflectionTestUtils.setField(subscriptionService, "subscriptionUpdateExecutor", executorService); List> futures = new ArrayList<>(); - try { - subscriptionService.onCoreStartupMsg(TransportProtos.CoreStartupMsg.newBuilder().addAllPartitions(List.of(0)).getDefaultInstanceForType()); - for (int i = 0; i < 50; i++) { - futures.add(executorService.submit(() -> subscriptionService.addSubscription(createSubscription(tenantId, deviceId), sessionRef))); - } - Futures.allAsList(futures).get(); - } finally { - executorService.shutdownNow(); + subscriptionService.onCoreStartupMsg(TransportProtos.CoreStartupMsg.newBuilder().addAllPartitions(List.of(0)).getDefaultInstanceForType()); + for (int i = 0; i < 50; i++) { + futures.add(executorService.submit(() -> subscriptionService.addSubscription(createSubscription(tenantId, deviceId), sessionRef))); } + Futures.allAsList(futures).get(); List logs = testLogAppender.list; boolean exceptionLogged = logs.stream() @@ -123,4 +134,38 @@ public class DefaultTbLocalSubscriptionServiceTest { .keyStates(keys) .build(); } + + @Test + public void updateOldTelemetryTest() { + WebSocketSessionRef sessionRef = mock(); + + String key = "temperature"; + long ts = 1237465456L; + + Map keyStates = new HashMap<>(); + keyStates.put(key, ts); + + AtomicReference capturedUpdate = new AtomicReference<>(); + TbTimeSeriesSubscription tsSubscription = TbTimeSeriesSubscription.builder() + .tenantId(tenantId) + .entityId(deviceId) + .subscriptionId(2) + .sessionId(RandomStringUtils.randomAlphanumeric(5)) + .keyStates(keyStates) + .allKeys(true) + .latestValues(true) + .updateProcessor((subscription, update) -> capturedUpdate.set(update)) + .build(); + + subscriptionService.addSubscription(tsSubscription, sessionRef); + + // Send telemetry with ts == stateTs + TsKvEntry kv = new BasicTsKvEntry(ts, new StringDataEntry(key, "42")); + subscriptionService.onTimeSeriesUpdate(deviceId, List.of(kv), mock(TbCallback.class)); + + TelemetrySubscriptionUpdate update = capturedUpdate.get(); + assertNotNull(update); + assertTrue(update.getLatestValues().containsKey(key)); + } + } diff --git a/application/src/test/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtxTest.java b/application/src/test/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtxTest.java new file mode 100644 index 0000000000..1f0da1a30c --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtxTest.java @@ -0,0 +1,130 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.query.EntityData; +import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.common.data.query.TsValue; +import org.thingsboard.server.service.ws.WebSocketService; +import org.thingsboard.server.service.ws.WebSocketSessionRef; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; +import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TbEntityDataSubCtxTest { + + private final DeviceId deviceId = new DeviceId(UUID.randomUUID()); + + private final Integer cmdId = RandomUtils.nextInt(); + private final Integer subscriptionId = RandomUtils.nextInt(); + private final String serviceId = RandomStringUtils.randomAlphanumeric(10); + private final String sessionId = RandomStringUtils.randomAlphanumeric(10); + + private final int maxEntitiesPerDataSubscription = 100; + + private TbEntityDataSubCtx subCtx; + @Mock + private WebSocketService webSocketService; + @Mock + private WebSocketSessionRef webSocketSessionRef; + + @BeforeEach + public void setUp() { + when(webSocketSessionRef.getSessionId()).thenReturn(sessionId); + subCtx = new TbEntityDataSubCtx(serviceId, webSocketService, mock(), mock(), mock(), mock(), webSocketSessionRef, cmdId, maxEntitiesPerDataSubscription); + + Map subToEntityIdMap = new HashMap<>(); + subToEntityIdMap.put(subscriptionId, deviceId); + ReflectionTestUtils.setField(subCtx, "subToEntityIdMap", subToEntityIdMap); + + long now = System.currentTimeMillis(); + long oldTs = now - 1_000_000; + + Map latestCtxValues = new HashMap<>(); + latestCtxValues.put("key", new TsValue(oldTs, "15")); + Map> latest = new HashMap<>(); + latest.put(EntityKeyType.TIME_SERIES, latestCtxValues); + + EntityData entityData = new EntityData(); + entityData.setEntityId(deviceId); + entityData.setLatest(latest); + + PageData data = new PageData<>(List.of(entityData), 1, 1, true); + ReflectionTestUtils.setField(subCtx, "data", data); + } + + @Test + public void testSendLatestWsMsg() { + long ts = System.currentTimeMillis(); + List telemetry = List.of( + new BasicTsKvEntry(ts - 50000, new LongDataEntry("key", 42L), 34L), + new BasicTsKvEntry(ts - 20000, new LongDataEntry("key", 17L), 78L) + ); + + TelemetrySubscriptionUpdate subUpdate = new TelemetrySubscriptionUpdate(subscriptionId, telemetry); + + subCtx.sendWsMsg(sessionId, subUpdate, EntityKeyType.TIME_SERIES, true); + + Map> expectedLatest = new HashMap<>(); + Map expectedLatestCtxValues = new HashMap<>(); + expectedLatestCtxValues.put("key", new TsValue(ts - 20000, "17")); // use latest telemetry + expectedLatest.put(EntityKeyType.TIME_SERIES, expectedLatestCtxValues); + + EntityData expectedEntityData = new EntityData(); + expectedEntityData.setEntityId(deviceId); + expectedEntityData.setLatest(expectedLatest); + + List expected = List.of(expectedEntityData); + + ArgumentCaptor cmdUpdateCaptor = ArgumentCaptor.forClass(CmdUpdate.class); + then(webSocketService).should().sendUpdate(eq(sessionId), cmdUpdateCaptor.capture()); + CmdUpdate cmdUpdate = cmdUpdateCaptor.getValue(); + assertThat(cmdUpdate).isInstanceOf(EntityDataUpdate.class); + EntityDataUpdate entityDataUpdate = (EntityDataUpdate) cmdUpdate; + assertThat(entityDataUpdate.getCmdId()).isEqualTo(cmdId); + assertThat(entityDataUpdate.getData()).isNull(); + assertThat(entityDataUpdate.getUpdate()).isEqualTo(expected); + assertThat(entityDataUpdate.getAllowedEntities()).isEqualTo(maxEntitiesPerDataSubscription); + } + +} From c8b13fc3507645fb6d690d7b131708067b0b9891 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 7 May 2025 11:13:36 +0300 Subject: [PATCH 2/2] added await for test --- .../DefaultTbLocalSubscriptionServiceTest.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java index 623dd6b9e2..3bb470830b 100644 --- a/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionServiceTest.java @@ -49,8 +49,10 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -163,9 +165,13 @@ public class DefaultTbLocalSubscriptionServiceTest { TsKvEntry kv = new BasicTsKvEntry(ts, new StringDataEntry(key, "42")); subscriptionService.onTimeSeriesUpdate(deviceId, List.of(kv), mock(TbCallback.class)); - TelemetrySubscriptionUpdate update = capturedUpdate.get(); - assertNotNull(update); - assertTrue(update.getLatestValues().containsKey(key)); + await().atMost(5, TimeUnit.SECONDS) + .pollInterval(5, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + TelemetrySubscriptionUpdate update = capturedUpdate.get(); + assertNotNull(update); + assertTrue(update.getLatestValues().containsKey(key)); + }); } }