diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index e0eb4820fd..82d9eb779a 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -228,7 +228,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } } else if (!theCtx.isInitialDataSent()) { EntityDataUpdate update = new EntityDataUpdate(theCtx.getCmdId(), theCtx.getData(), null, theCtx.getMaxEntitiesPerDataSubscription()); - wsService.sendWsMsg(theCtx.getSessionId(), update); + theCtx.sendWsMsg(update); theCtx.setInitialDataSent(true); } } catch (RuntimeException e) { @@ -287,7 +287,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc ctx.clearEntitySubscriptions(); if (entities.isEmpty()) { AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(), new PageData<>(), null, 0, 0); - wsService.sendWsMsg(ctx.getSessionId(), update); + ctx.sendWsMsg(update); } else { ctx.fetchAlarms(); ctx.createLatestValuesSubscriptions(cmd.getQuery().getLatestValues()); @@ -420,22 +420,26 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } } catch (InterruptedException | ExecutionException e) { log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e); - wsService.sendWsMsg(ctx.getSessionId(), - new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!")); + ctx.sendWsMsg(new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!")); } }); - EntityDataUpdate update; - if (!ctx.isInitialDataSent()) { - update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription()); - ctx.setInitialDataSent(true); - } else { - update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData(), ctx.getMaxEntitiesPerDataSubscription()); + ctx.getWsLock().lock(); + try { + EntityDataUpdate update; + if (!ctx.isInitialDataSent()) { + update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription()); + ctx.setInitialDataSent(true); + } else { + update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData(), ctx.getMaxEntitiesPerDataSubscription()); + } + if (subscribe) { + ctx.createTimeseriesSubscriptions(keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList()), cmd.getStartTs(), cmd.getEndTs()); + } + ctx.sendWsMsg(update); + ctx.getData().getData().forEach(ed -> ed.getTimeseries().clear()); + } finally { + ctx.getWsLock().unlock(); } - wsService.sendWsMsg(ctx.getSessionId(), update); - if (subscribe) { - ctx.createTimeseriesSubscriptions(keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList()), cmd.getStartTs(), cmd.getEndTs()); - } - ctx.getData().getData().forEach(ed -> ed.getTimeseries().clear()); return ctx; }, wsCallBackExecutor); } @@ -464,7 +468,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc ListenableFuture> missingTsData = tsService.findLatest(ctx.getTenantId(), entityData.getEntityId(), missingTsKeys); missingTelemetryFutures.put(entityData, Futures.transform(missingTsData, this::toTsValue, MoreExecutors.directExecutor())); } - Futures.addCallback(Futures.allAsList(missingTelemetryFutures.values()), new FutureCallback>>() { + Futures.addCallback(Futures.allAsList(missingTelemetryFutures.values()), new FutureCallback<>() { @Override public void onSuccess(@Nullable List> result) { missingTelemetryFutures.forEach((key, value) -> { @@ -475,30 +479,39 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } }); EntityDataUpdate update; - if (!ctx.isInitialDataSent()) { - update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription()); - ctx.setInitialDataSent(true); - } else { - update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData(), ctx.getMaxEntitiesPerDataSubscription()); + ctx.getWsLock().lock(); + try { + ctx.createLatestValuesSubscriptions(latestCmd.getKeys()); + if (!ctx.isInitialDataSent()) { + update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription()); + ctx.setInitialDataSent(true); + } else { + update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData(), ctx.getMaxEntitiesPerDataSubscription()); + } + ctx.sendWsMsg(update); + } finally { + ctx.getWsLock().unlock(); } - wsService.sendWsMsg(ctx.getSessionId(), update); - ctx.createLatestValuesSubscriptions(latestCmd.getKeys()); } @Override public void onFailure(Throwable t) { log.warn("[{}][{}] Failed to process websocket command: {}:{}", ctx.getSessionId(), ctx.getCmdId(), ctx.getQuery(), latestCmd, t); - wsService.sendWsMsg(ctx.getSessionId(), - new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!")); + ctx.sendWsMsg(new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!")); } }, wsCallBackExecutor); } else { - if (!ctx.isInitialDataSent()) { - EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription()); - wsService.sendWsMsg(ctx.getSessionId(), update); - ctx.setInitialDataSent(true); + ctx.getWsLock().lock(); + try { + ctx.createLatestValuesSubscriptions(latestCmd.getKeys()); + if (!ctx.isInitialDataSent()) { + EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription()); + ctx.sendWsMsg(update); + ctx.setInitialDataSent(true); + } + } finally { + ctx.getWsLock().unlock(); } - ctx.createLatestValuesSubscriptions(latestCmd.getKeys()); } } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java index 749825f644..a8d2472eb0 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -41,6 +41,7 @@ import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; +import org.thingsboard.server.service.telemetry.cmd.v2.CmdUpdate; import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; import java.util.ArrayList; @@ -52,14 +53,18 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j @Data public abstract class TbAbstractSubCtx { + @Getter + protected final Lock wsLock = new ReentrantLock(true); protected final String serviceId; protected final SubscriptionServiceStatistics stats; - protected final TelemetryWebSocketService wsService; + private final TelemetryWebSocketService wsService; protected final EntityService entityService; protected final TbLocalSubscriptionService localSubscriptionService; protected final AttributesService attributesService; @@ -314,4 +319,13 @@ public abstract class TbAbstractSubCtx { private final String sourceAttribute; } + public void sendWsMsg(CmdUpdate update) { + wsLock.lock(); + try { + wsService.sendWsMsg(sessionRef.getSessionId(), update); + } finally { + wsLock.unlock(); + } + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java index d87f1557ff..8655a91e96 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -117,7 +117,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { } else { update = new AlarmDataUpdate(cmdId, new PageData<>(), null, maxEntitiesPerAlarmSubscription, data.getTotalElements()); } - wsService.sendWsMsg(getSessionId(), update); + sendWsMsg(update); } public void fetchData() { @@ -198,7 +198,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { return alarm; }).collect(Collectors.toList()); if (!update.isEmpty()) { - wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, update, maxEntitiesPerAlarmSubscription, data.getTotalElements())); + sendWsMsg(new AlarmDataUpdate(cmdId, null, update, maxEntitiesPerAlarmSubscription, data.getTotalElements())); } } else { log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate); @@ -222,7 +222,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { AlarmData updated = new AlarmData(alarm, current.getOriginatorName(), current.getEntityId()); updated.getLatest().putAll(current.getLatest()); alarmsMap.put(alarmId, updated); - wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated), maxEntitiesPerAlarmSubscription, data.getTotalElements())); + sendWsMsg(new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated), maxEntitiesPerAlarmSubscription, data.getTotalElements())); } else { fetchAlarms(); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityCountSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityCountSubCtx.java index 53df7c4aaf..7fc87eb17c 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityCountSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityCountSubCtx.java @@ -17,14 +17,11 @@ package org.thingsboard.server.service.subscription; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.query.EntityCountQuery; -import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountUpdate; -import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; -import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; @Slf4j public class TbEntityCountSubCtx extends TbAbstractSubCtx { @@ -40,7 +37,7 @@ public class TbEntityCountSubCtx extends TbAbstractSubCtx { @Override public void fetchData() { result = (int) entityService.countEntitiesByQuery(getTenantId(), getCustomerId(), query); - wsService.sendWsMsg(sessionRef.getSessionId(), new EntityCountUpdate(cmdId, result)); + sendWsMsg(new EntityCountUpdate(cmdId, result)); } @Override @@ -48,7 +45,7 @@ public class TbEntityCountSubCtx extends TbAbstractSubCtx { int newCount = (int) entityService.countEntitiesByQuery(getTenantId(), getCustomerId(), query); if (newCount != result) { result = newCount; - wsService.sendWsMsg(sessionRef.getSessionId(), new EntityCountUpdate(cmdId, result)); + sendWsMsg(new EntityCountUpdate(cmdId, result)); } } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index b8211e50cc..6423604c4e 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -51,7 +51,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { @Getter @Setter - private boolean initialDataSent; + private volatile boolean initialDataSent; private TimeSeriesCmd curTsCmd; private LatestValueCmd latestValueCmd; @Getter @@ -121,7 +121,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { if (!latestUpdate.isEmpty()) { Map> latestMap = Collections.singletonMap(keyType, latestUpdate); entityData = new EntityData(entityId, latestMap, null); - wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData), maxEntitiesPerDataSubscription)); + sendWsMsg(new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData), maxEntitiesPerDataSubscription)); } } @@ -162,7 +162,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { Map tsMap = new HashMap<>(); tsUpdate.forEach((key, tsValue) -> tsMap.put(key, tsValue.toArray(new TsValue[tsValue.size()]))); EntityData entityData = new EntityData(entityId, null, tsMap); - wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData), maxEntitiesPerDataSubscription)); + sendWsMsg(new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData), maxEntitiesPerDataSubscription)); } } @@ -219,9 +219,9 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { } } } - wsService.sendWsMsg(sessionRef.getSessionId(), new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription)); subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId)); subsToAdd.forEach(localSubscriptionService::addSubscription); + sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription)); } public void setCurrentCmd(EntityDataCmd cmd) { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index 67fe9346a7..5a50a2ea22 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -442,7 +442,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi private void handleWsAttributesSubscriptionByKeys(TelemetryWebSocketSessionRef sessionRef, AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId, List keys) { - FutureCallback> callback = new FutureCallback>() { + FutureCallback> callback = new FutureCallback<>() { @Override public void onSuccess(List data) { List attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); @@ -542,7 +542,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi private void handleWsAttributesSubscription(TelemetryWebSocketSessionRef sessionRef, AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) { - FutureCallback> callback = new FutureCallback>() { + FutureCallback> callback = new FutureCallback<>() { @Override public void onSuccess(List data) { List attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); @@ -666,7 +666,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } private FutureCallback> getSubscriptionCallback(final TelemetryWebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final EntityId entityId, final long startTs, final List keys) { - return new FutureCallback>() { + return new FutureCallback<>() { @Override public void onSuccess(List data) { sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data)); diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java index 90d6ac7b9d..f7838d9689 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java @@ -102,7 +102,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { List tsData = Arrays.asList(dataPoint1, dataPoint2, dataPoint3); sendTelemetry(device, tsData); - Thread.sleep(100); update = getWsClient().sendHistoryCmd(keys, now, TimeUnit.HOURS.toMillis(1), dtf); @@ -136,7 +135,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { List tsData = Arrays.asList(dataPoint1, dataPoint2, dataPoint3); sendTelemetry(device, tsData); - Thread.sleep(100); update = getWsClient().subscribeTsUpdate(List.of("temperature"), now, TimeUnit.HOURS.toMillis(1)); Assert.assertEquals(1, update.getCmdId()); @@ -153,7 +151,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { now = System.currentTimeMillis(); TsKvEntry dataPoint4 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 45L)); getWsClient().registerWaitForUpdate(); - Thread.sleep(100); sendTelemetry(device, Arrays.asList(dataPoint4)); String msg = getWsClient().waitForUpdate(); @@ -309,13 +306,12 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature").getTs()); Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature").getValue()); + getWsClient().registerWaitForUpdate(); TsKvEntry dataPoint1 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("temperature", 42L)); List tsData = Arrays.asList(dataPoint1); sendTelemetry(device, tsData); - Thread.sleep(100); - - update = getWsClient().subscribeLatestUpdate(keys, dtf); + update = getWsClient().parseDataReply(getWsClient().waitForUpdate()); Assert.assertEquals(1, update.getCmdId()); @@ -329,7 +325,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { now = System.currentTimeMillis(); TsKvEntry dataPoint2 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 52L)); - getWsClient().registerWaitForUpdate(); sendTelemetry(device, Arrays.asList(dataPoint2)); update = getWsClient().parseDataReply(getWsClient().waitForUpdate()); @@ -371,7 +366,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getValue()); getWsClient().registerWaitForUpdate(); - Thread.sleep(500); AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("serverAttributeKey", 42L)); List tsData = Arrays.asList(dataPoint1); @@ -394,7 +388,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { AttributeKvEntry dataPoint2 = new BaseAttributeKvEntry(now, new LongDataEntry("serverAttributeKey", 52L)); getWsClient().registerWaitForUpdate(); - Thread.sleep(500); sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint2)); msg = getWsClient().waitForUpdate(); Assert.assertNotNull(msg); @@ -411,14 +404,12 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { //Sending update from the past, while latest value has new timestamp; getWsClient().registerWaitForUpdate(); - Thread.sleep(500); sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint1)); msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1)); Assert.assertNull(msg); //Sending duplicate update again getWsClient().registerWaitForUpdate(); - Thread.sleep(500); sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint2)); msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1)); Assert.assertNull(msg); @@ -456,7 +447,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest { getWsClient().registerWaitForUpdate(); AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("serverAttributeKey", 42L)); List tsData = Arrays.asList(dataPoint1); - Thread.sleep(100); sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, tsData); diff --git a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java index 6af6176b06..e2031e701f 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java +++ b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java @@ -44,8 +44,8 @@ import java.util.concurrent.TimeUnit; public class TbTestWebSocketClient extends WebSocketClient { private volatile String lastMsg; - private CountDownLatch reply; - private CountDownLatch update; + private volatile CountDownLatch reply; + private volatile CountDownLatch update; public TbTestWebSocketClient(URI serverUri) { super(serverUri);