Merge branch 'fix/telemetry-bug' of github.com:irynamatveieva/thingsboard into fix/telemetry-bug
This commit is contained in:
		
						commit
						c93a82a627
					
				@ -348,7 +348,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
 | 
			
		||||
                        if (sub.isLatestValues()) {
 | 
			
		||||
                            for (TsKvEntry kv : data) {
 | 
			
		||||
                                Long stateTs = keyStates.get(kv.getKey());
 | 
			
		||||
                                if (stateTs == null || kv.getTs() > stateTs) {
 | 
			
		||||
                                if (stateTs == null || kv.getTs() >= stateTs) {
 | 
			
		||||
                                    if (updateData == null) {
 | 
			
		||||
                                        updateData = new ArrayList<>();
 | 
			
		||||
                                    }
 | 
			
		||||
@ -362,7 +362,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
 | 
			
		||||
                        for (TsKvEntry kv : data) {
 | 
			
		||||
                            Long stateTs = keyStates.get(kv.getKey());
 | 
			
		||||
                            if (stateTs != null) {
 | 
			
		||||
                                if (!sub.isLatestValues() || kv.getTs() > stateTs) {
 | 
			
		||||
                                if (!sub.isLatestValues() || kv.getTs() >= stateTs) {
 | 
			
		||||
                                    if (updateData == null) {
 | 
			
		||||
                                        updateData = new ArrayList<>();
 | 
			
		||||
                                    }
 | 
			
		||||
 | 
			
		||||
@ -94,35 +94,36 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
 | 
			
		||||
 | 
			
		||||
    private void sendLatestWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) {
 | 
			
		||||
        Map<String, TsValue> latestUpdate = new HashMap<>();
 | 
			
		||||
        subscriptionUpdate.getData().forEach((k, v) -> {
 | 
			
		||||
            Object[] data = (Object[]) v.get(0);
 | 
			
		||||
            latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1]));
 | 
			
		||||
        });
 | 
			
		||||
        subscriptionUpdate.getData().forEach((key, data) -> data.stream()
 | 
			
		||||
                .filter(o -> o instanceof Object[] && ((Object[]) o).length > 0 && ((Object[]) o)[0] instanceof Long)
 | 
			
		||||
                .max(Comparator.comparingLong(o -> (Long) ((Object[]) o)[0]))
 | 
			
		||||
                .ifPresent(max -> {
 | 
			
		||||
                    Object[] arr = (Object[]) max;
 | 
			
		||||
                    latestUpdate.put(key, new TsValue((Long) arr[0], (String) arr[1]));
 | 
			
		||||
                }));
 | 
			
		||||
        EntityData entityData = getDataForEntity(entityId);
 | 
			
		||||
        if (entityData != null && entityData.getLatest() != null) {
 | 
			
		||||
            Map<String, TsValue> latestCtxValues = entityData.getLatest().get(keyType);
 | 
			
		||||
            Map<String, TsValue> latestCtxValues = entityData.getLatest().computeIfAbsent(keyType, key -> new HashMap<>());
 | 
			
		||||
            log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues);
 | 
			
		||||
            if (latestCtxValues != null) {
 | 
			
		||||
                latestCtxValues.forEach((k, v) -> {
 | 
			
		||||
                    TsValue update = latestUpdate.get(k);
 | 
			
		||||
                    if (update != null) {
 | 
			
		||||
                        //Ignore notifications about deleted keys
 | 
			
		||||
                        if (!(update.getTs() == 0 && (update.getValue() == null || update.getValue().isEmpty()))) {
 | 
			
		||||
                            if (update.getTs() < v.getTs()) {
 | 
			
		||||
                                log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
 | 
			
		||||
                                latestUpdate.remove(k);
 | 
			
		||||
                            } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) {
 | 
			
		||||
                                log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
 | 
			
		||||
                                latestUpdate.remove(k);
 | 
			
		||||
                            }
 | 
			
		||||
                        } else {
 | 
			
		||||
                            log.trace("[{}][{}][{}] Received deleted notification for: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k);
 | 
			
		||||
            latestCtxValues.forEach((k, v) -> {
 | 
			
		||||
                TsValue update = latestUpdate.get(k);
 | 
			
		||||
                if (update != null) {
 | 
			
		||||
                    //Ignore notifications about deleted keys
 | 
			
		||||
                    if (!(update.getTs() == 0 && (update.getValue() == null || update.getValue().isEmpty()))) {
 | 
			
		||||
                        if (update.getTs() < v.getTs()) {
 | 
			
		||||
                            log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
 | 
			
		||||
                            latestUpdate.remove(k);
 | 
			
		||||
                        } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) {
 | 
			
		||||
                            log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
 | 
			
		||||
                            latestUpdate.remove(k);
 | 
			
		||||
                        }
 | 
			
		||||
                    } else {
 | 
			
		||||
                        log.trace("[{}][{}][{}] Received deleted notification for: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k);
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
                //Setting new values
 | 
			
		||||
                latestUpdate.forEach(latestCtxValues::put);
 | 
			
		||||
            }
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
            //Setting new values
 | 
			
		||||
            latestCtxValues.putAll(latestUpdate);
 | 
			
		||||
        }
 | 
			
		||||
        if (!latestUpdate.isEmpty()) {
 | 
			
		||||
            Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(keyType, latestUpdate);
 | 
			
		||||
 | 
			
		||||
@ -32,11 +32,16 @@ import org.thingsboard.server.cache.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.limit.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TbCallback;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.service.ws.WebSocketSessionRef;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
@ -44,8 +49,13 @@ import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicReference;
 | 
			
		||||
 | 
			
		||||
import static org.awaitility.Awaitility.await;
 | 
			
		||||
import static org.junit.jupiter.api.Assertions.assertFalse;
 | 
			
		||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
 | 
			
		||||
import static org.junit.jupiter.api.Assertions.assertTrue;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.eq;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.nullable;
 | 
			
		||||
@ -54,11 +64,17 @@ import static org.mockito.Mockito.when;
 | 
			
		||||
 | 
			
		||||
public class DefaultTbLocalSubscriptionServiceTest {
 | 
			
		||||
 | 
			
		||||
    ListAppender<ILoggingEvent> testLogAppender;
 | 
			
		||||
    TbLocalSubscriptionService subscriptionService;
 | 
			
		||||
    private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
 | 
			
		||||
    private final DeviceId deviceId = new DeviceId(UUID.randomUUID());
 | 
			
		||||
 | 
			
		||||
    private ListAppender<ILoggingEvent> testLogAppender;
 | 
			
		||||
    private TbLocalSubscriptionService subscriptionService;
 | 
			
		||||
    private ListeningExecutorService executorService;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    public void setUp() throws Exception {
 | 
			
		||||
        this.executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
 | 
			
		||||
 | 
			
		||||
        Logger logger = (Logger) LoggerFactory.getLogger(DefaultTbLocalSubscriptionService.class);
 | 
			
		||||
        testLogAppender = new ListAppender<>();
 | 
			
		||||
        testLogAppender.start();
 | 
			
		||||
@ -69,7 +85,9 @@ public class DefaultTbLocalSubscriptionServiceTest {
 | 
			
		||||
        PartitionService partitionService = mock();
 | 
			
		||||
        when(partitionService.resolve(any(), any(), any())).thenReturn(TopicPartitionInfo.builder().build());
 | 
			
		||||
        subscriptionService = new DefaultTbLocalSubscriptionService(mock(), mock(), mock(), partitionService, mock(), mock(), mock(), rateLimitService);
 | 
			
		||||
 | 
			
		||||
        ReflectionTestUtils.setField(subscriptionService, "serviceId", "serviceId");
 | 
			
		||||
        ReflectionTestUtils.setField(subscriptionService, "subscriptionUpdateExecutor", executorService);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @AfterEach
 | 
			
		||||
@ -79,27 +97,22 @@ public class DefaultTbLocalSubscriptionServiceTest {
 | 
			
		||||
            Logger logger = (Logger) LoggerFactory.getLogger(DefaultTbLocalSubscriptionService.class);
 | 
			
		||||
            logger.detachAppender(testLogAppender);
 | 
			
		||||
        }
 | 
			
		||||
        if (executorService != null) {
 | 
			
		||||
            executorService.shutdownNow();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void addSubscriptionConcurrentModificationTest() throws Exception {
 | 
			
		||||
        ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
 | 
			
		||||
        TenantId tenantId = new TenantId(UUID.randomUUID());
 | 
			
		||||
        DeviceId deviceId = new DeviceId(UUID.randomUUID());
 | 
			
		||||
        WebSocketSessionRef sessionRef = mock();
 | 
			
		||||
        ReflectionTestUtils.setField(subscriptionService, "subscriptionUpdateExecutor", executorService);
 | 
			
		||||
 | 
			
		||||
        List<ListenableFuture<?>> futures = new ArrayList<>();
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            subscriptionService.onCoreStartupMsg(TransportProtos.CoreStartupMsg.newBuilder().addAllPartitions(List.of(0)).getDefaultInstanceForType());
 | 
			
		||||
            for (int i = 0; i < 50; i++) {
 | 
			
		||||
                futures.add(executorService.submit(() -> subscriptionService.addSubscription(createSubscription(tenantId, deviceId), sessionRef)));
 | 
			
		||||
            }
 | 
			
		||||
            Futures.allAsList(futures).get();
 | 
			
		||||
        } finally {
 | 
			
		||||
            executorService.shutdownNow();
 | 
			
		||||
        subscriptionService.onCoreStartupMsg(TransportProtos.CoreStartupMsg.newBuilder().addAllPartitions(List.of(0)).getDefaultInstanceForType());
 | 
			
		||||
        for (int i = 0; i < 50; i++) {
 | 
			
		||||
            futures.add(executorService.submit(() -> subscriptionService.addSubscription(createSubscription(tenantId, deviceId), sessionRef)));
 | 
			
		||||
        }
 | 
			
		||||
        Futures.allAsList(futures).get();
 | 
			
		||||
 | 
			
		||||
        List<ILoggingEvent> logs = testLogAppender.list;
 | 
			
		||||
        boolean exceptionLogged = logs.stream()
 | 
			
		||||
@ -123,4 +136,42 @@ public class DefaultTbLocalSubscriptionServiceTest {
 | 
			
		||||
                .keyStates(keys)
 | 
			
		||||
                .build();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void updateOldTelemetryTest() {
 | 
			
		||||
        WebSocketSessionRef sessionRef = mock();
 | 
			
		||||
 | 
			
		||||
        String key = "temperature";
 | 
			
		||||
        long ts = 1237465456L;
 | 
			
		||||
 | 
			
		||||
        Map<String, Long> keyStates = new HashMap<>();
 | 
			
		||||
        keyStates.put(key, ts);
 | 
			
		||||
 | 
			
		||||
        AtomicReference<TelemetrySubscriptionUpdate> capturedUpdate = new AtomicReference<>();
 | 
			
		||||
        TbTimeSeriesSubscription tsSubscription = TbTimeSeriesSubscription.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .entityId(deviceId)
 | 
			
		||||
                .subscriptionId(2)
 | 
			
		||||
                .sessionId(RandomStringUtils.randomAlphanumeric(5))
 | 
			
		||||
                .keyStates(keyStates)
 | 
			
		||||
                .allKeys(true)
 | 
			
		||||
                .latestValues(true)
 | 
			
		||||
                .updateProcessor((subscription, update) -> capturedUpdate.set(update))
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        subscriptionService.addSubscription(tsSubscription, sessionRef);
 | 
			
		||||
 | 
			
		||||
        // Send telemetry with ts == stateTs
 | 
			
		||||
        TsKvEntry kv = new BasicTsKvEntry(ts, new StringDataEntry(key, "42"));
 | 
			
		||||
        subscriptionService.onTimeSeriesUpdate(deviceId, List.of(kv), mock(TbCallback.class));
 | 
			
		||||
 | 
			
		||||
        await().atMost(5, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(5, TimeUnit.MILLISECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    TelemetrySubscriptionUpdate update = capturedUpdate.get();
 | 
			
		||||
                    assertNotNull(update);
 | 
			
		||||
                    assertTrue(update.getLatestValues().containsKey(key));
 | 
			
		||||
                });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,130 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2025 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.subscription;
 | 
			
		||||
 | 
			
		||||
import org.apache.commons.lang3.RandomStringUtils;
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.junit.jupiter.api.extension.ExtendWith;
 | 
			
		||||
import org.mockito.ArgumentCaptor;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.junit.jupiter.MockitoExtension;
 | 
			
		||||
import org.springframework.test.util.ReflectionTestUtils;
 | 
			
		||||
import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityData;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityKeyType;
 | 
			
		||||
import org.thingsboard.server.common.data.query.TsValue;
 | 
			
		||||
import org.thingsboard.server.service.ws.WebSocketService;
 | 
			
		||||
import org.thingsboard.server.service.ws.WebSocketSessionRef;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
 | 
			
		||||
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.eq;
 | 
			
		||||
import static org.mockito.BDDMockito.then;
 | 
			
		||||
import static org.mockito.Mockito.mock;
 | 
			
		||||
import static org.mockito.Mockito.when;
 | 
			
		||||
 | 
			
		||||
@ExtendWith(MockitoExtension.class)
 | 
			
		||||
public class TbEntityDataSubCtxTest {
 | 
			
		||||
 | 
			
		||||
    private final DeviceId deviceId = new DeviceId(UUID.randomUUID());
 | 
			
		||||
 | 
			
		||||
    private final Integer cmdId = RandomUtils.nextInt();
 | 
			
		||||
    private final Integer subscriptionId = RandomUtils.nextInt();
 | 
			
		||||
    private final String serviceId = RandomStringUtils.randomAlphanumeric(10);
 | 
			
		||||
    private final String sessionId = RandomStringUtils.randomAlphanumeric(10);
 | 
			
		||||
 | 
			
		||||
    private final int maxEntitiesPerDataSubscription = 100;
 | 
			
		||||
 | 
			
		||||
    private TbEntityDataSubCtx subCtx;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private WebSocketService webSocketService;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private WebSocketSessionRef webSocketSessionRef;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    public void setUp() {
 | 
			
		||||
        when(webSocketSessionRef.getSessionId()).thenReturn(sessionId);
 | 
			
		||||
        subCtx = new TbEntityDataSubCtx(serviceId, webSocketService, mock(), mock(), mock(), mock(), webSocketSessionRef, cmdId, maxEntitiesPerDataSubscription);
 | 
			
		||||
 | 
			
		||||
        Map<Integer, EntityId> subToEntityIdMap = new HashMap<>();
 | 
			
		||||
        subToEntityIdMap.put(subscriptionId, deviceId);
 | 
			
		||||
        ReflectionTestUtils.setField(subCtx, "subToEntityIdMap", subToEntityIdMap);
 | 
			
		||||
 | 
			
		||||
        long now = System.currentTimeMillis();
 | 
			
		||||
        long oldTs = now - 1_000_000;
 | 
			
		||||
 | 
			
		||||
        Map<String, TsValue> latestCtxValues = new HashMap<>();
 | 
			
		||||
        latestCtxValues.put("key", new TsValue(oldTs, "15"));
 | 
			
		||||
        Map<EntityKeyType, Map<String, TsValue>> latest = new HashMap<>();
 | 
			
		||||
        latest.put(EntityKeyType.TIME_SERIES, latestCtxValues);
 | 
			
		||||
 | 
			
		||||
        EntityData entityData = new EntityData();
 | 
			
		||||
        entityData.setEntityId(deviceId);
 | 
			
		||||
        entityData.setLatest(latest);
 | 
			
		||||
 | 
			
		||||
        PageData<EntityData> data = new PageData<>(List.of(entityData), 1, 1, true);
 | 
			
		||||
        ReflectionTestUtils.setField(subCtx, "data", data);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testSendLatestWsMsg() {
 | 
			
		||||
        long ts = System.currentTimeMillis();
 | 
			
		||||
        List<TsKvEntry> telemetry = List.of(
 | 
			
		||||
                new BasicTsKvEntry(ts - 50000, new LongDataEntry("key", 42L), 34L),
 | 
			
		||||
                new BasicTsKvEntry(ts - 20000, new LongDataEntry("key", 17L), 78L)
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        TelemetrySubscriptionUpdate subUpdate = new TelemetrySubscriptionUpdate(subscriptionId, telemetry);
 | 
			
		||||
 | 
			
		||||
        subCtx.sendWsMsg(sessionId, subUpdate, EntityKeyType.TIME_SERIES, true);
 | 
			
		||||
 | 
			
		||||
        Map<EntityKeyType, Map<String, TsValue>> expectedLatest = new HashMap<>();
 | 
			
		||||
        Map<String, TsValue> expectedLatestCtxValues = new HashMap<>();
 | 
			
		||||
        expectedLatestCtxValues.put("key", new TsValue(ts - 20000, "17")); // use latest telemetry
 | 
			
		||||
        expectedLatest.put(EntityKeyType.TIME_SERIES, expectedLatestCtxValues);
 | 
			
		||||
 | 
			
		||||
        EntityData expectedEntityData = new EntityData();
 | 
			
		||||
        expectedEntityData.setEntityId(deviceId);
 | 
			
		||||
        expectedEntityData.setLatest(expectedLatest);
 | 
			
		||||
 | 
			
		||||
        List<EntityData> expected = List.of(expectedEntityData);
 | 
			
		||||
 | 
			
		||||
        ArgumentCaptor<CmdUpdate> cmdUpdateCaptor = ArgumentCaptor.forClass(CmdUpdate.class);
 | 
			
		||||
        then(webSocketService).should().sendUpdate(eq(sessionId), cmdUpdateCaptor.capture());
 | 
			
		||||
        CmdUpdate cmdUpdate = cmdUpdateCaptor.getValue();
 | 
			
		||||
        assertThat(cmdUpdate).isInstanceOf(EntityDataUpdate.class);
 | 
			
		||||
        EntityDataUpdate entityDataUpdate = (EntityDataUpdate) cmdUpdate;
 | 
			
		||||
        assertThat(entityDataUpdate.getCmdId()).isEqualTo(cmdId);
 | 
			
		||||
        assertThat(entityDataUpdate.getData()).isNull();
 | 
			
		||||
        assertThat(entityDataUpdate.getUpdate()).isEqualTo(expected);
 | 
			
		||||
        assertThat(entityDataUpdate.getAllowedEntities()).isEqualTo(maxEntitiesPerDataSubscription);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user