Fix race conditions on WS subscribe commands processing

This commit is contained in:
Andrii Shvaika 2022-05-16 16:58:29 +03:00
parent f30e769ecc
commit ec7a36cd89
8 changed files with 78 additions and 64 deletions

View File

@ -228,7 +228,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}
} else if (!theCtx.isInitialDataSent()) {
EntityDataUpdate update = new EntityDataUpdate(theCtx.getCmdId(), theCtx.getData(), null, theCtx.getMaxEntitiesPerDataSubscription());
wsService.sendWsMsg(theCtx.getSessionId(), update);
theCtx.sendWsMsg(update);
theCtx.setInitialDataSent(true);
}
} catch (RuntimeException e) {
@ -287,7 +287,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
ctx.clearEntitySubscriptions();
if (entities.isEmpty()) {
AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(), new PageData<>(), null, 0, 0);
wsService.sendWsMsg(ctx.getSessionId(), update);
ctx.sendWsMsg(update);
} else {
ctx.fetchAlarms();
ctx.createLatestValuesSubscriptions(cmd.getQuery().getLatestValues());
@ -420,10 +420,11 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}
} catch (InterruptedException | ExecutionException e) {
log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e);
wsService.sendWsMsg(ctx.getSessionId(),
new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
ctx.sendWsMsg(new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
}
});
ctx.getWsLock().lock();
try {
EntityDataUpdate update;
if (!ctx.isInitialDataSent()) {
update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription());
@ -431,11 +432,14 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
} else {
update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData(), ctx.getMaxEntitiesPerDataSubscription());
}
wsService.sendWsMsg(ctx.getSessionId(), update);
if (subscribe) {
ctx.createTimeseriesSubscriptions(keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList()), cmd.getStartTs(), cmd.getEndTs());
}
ctx.sendWsMsg(update);
ctx.getData().getData().forEach(ed -> ed.getTimeseries().clear());
} finally {
ctx.getWsLock().unlock();
}
return ctx;
}, wsCallBackExecutor);
}
@ -464,7 +468,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
ListenableFuture<List<TsKvEntry>> missingTsData = tsService.findLatest(ctx.getTenantId(), entityData.getEntityId(), missingTsKeys);
missingTelemetryFutures.put(entityData, Futures.transform(missingTsData, this::toTsValue, MoreExecutors.directExecutor()));
}
Futures.addCallback(Futures.allAsList(missingTelemetryFutures.values()), new FutureCallback<List<Map<String, TsValue>>>() {
Futures.addCallback(Futures.allAsList(missingTelemetryFutures.values()), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Map<String, TsValue>> result) {
missingTelemetryFutures.forEach((key, value) -> {
@ -475,30 +479,39 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}
});
EntityDataUpdate update;
ctx.getWsLock().lock();
try {
ctx.createLatestValuesSubscriptions(latestCmd.getKeys());
if (!ctx.isInitialDataSent()) {
update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription());
ctx.setInitialDataSent(true);
} else {
update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData(), ctx.getMaxEntitiesPerDataSubscription());
}
wsService.sendWsMsg(ctx.getSessionId(), update);
ctx.createLatestValuesSubscriptions(latestCmd.getKeys());
ctx.sendWsMsg(update);
} finally {
ctx.getWsLock().unlock();
}
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}][{}] Failed to process websocket command: {}:{}", ctx.getSessionId(), ctx.getCmdId(), ctx.getQuery(), latestCmd, t);
wsService.sendWsMsg(ctx.getSessionId(),
new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!"));
ctx.sendWsMsg(new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!"));
}
}, wsCallBackExecutor);
} else {
ctx.getWsLock().lock();
try {
ctx.createLatestValuesSubscriptions(latestCmd.getKeys());
if (!ctx.isInitialDataSent()) {
EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription());
wsService.sendWsMsg(ctx.getSessionId(), update);
ctx.sendWsMsg(update);
ctx.setInitialDataSent(true);
}
ctx.createLatestValuesSubscriptions(latestCmd.getKeys());
} finally {
ctx.getWsLock().unlock();
}
}
}

View File

@ -41,6 +41,7 @@ import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
import org.thingsboard.server.service.telemetry.cmd.v2.CmdUpdate;
import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
import java.util.ArrayList;
@ -52,14 +53,18 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@Data
public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
@Getter
protected final Lock wsLock = new ReentrantLock(true);
protected final String serviceId;
protected final SubscriptionServiceStatistics stats;
protected final TelemetryWebSocketService wsService;
private final TelemetryWebSocketService wsService;
protected final EntityService entityService;
protected final TbLocalSubscriptionService localSubscriptionService;
protected final AttributesService attributesService;
@ -314,4 +319,13 @@ public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
private final String sourceAttribute;
}
public void sendWsMsg(CmdUpdate update) {
wsLock.lock();
try {
wsService.sendWsMsg(sessionRef.getSessionId(), update);
} finally {
wsLock.unlock();
}
}
}

View File

@ -117,7 +117,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
} else {
update = new AlarmDataUpdate(cmdId, new PageData<>(), null, maxEntitiesPerAlarmSubscription, data.getTotalElements());
}
wsService.sendWsMsg(getSessionId(), update);
sendWsMsg(update);
}
public void fetchData() {
@ -198,7 +198,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
return alarm;
}).collect(Collectors.toList());
if (!update.isEmpty()) {
wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, update, maxEntitiesPerAlarmSubscription, data.getTotalElements()));
sendWsMsg(new AlarmDataUpdate(cmdId, null, update, maxEntitiesPerAlarmSubscription, data.getTotalElements()));
}
} else {
log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate);
@ -222,7 +222,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
AlarmData updated = new AlarmData(alarm, current.getOriginatorName(), current.getEntityId());
updated.getLatest().putAll(current.getLatest());
alarmsMap.put(alarmId, updated);
wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated), maxEntitiesPerAlarmSubscription, data.getTotalElements()));
sendWsMsg(new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated), maxEntitiesPerAlarmSubscription, data.getTotalElements()));
} else {
fetchAlarms();
}

View File

@ -17,14 +17,11 @@ package org.thingsboard.server.service.subscription;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
@Slf4j
public class TbEntityCountSubCtx extends TbAbstractSubCtx<EntityCountQuery> {
@ -40,7 +37,7 @@ public class TbEntityCountSubCtx extends TbAbstractSubCtx<EntityCountQuery> {
@Override
public void fetchData() {
result = (int) entityService.countEntitiesByQuery(getTenantId(), getCustomerId(), query);
wsService.sendWsMsg(sessionRef.getSessionId(), new EntityCountUpdate(cmdId, result));
sendWsMsg(new EntityCountUpdate(cmdId, result));
}
@Override
@ -48,7 +45,7 @@ public class TbEntityCountSubCtx extends TbAbstractSubCtx<EntityCountQuery> {
int newCount = (int) entityService.countEntitiesByQuery(getTenantId(), getCustomerId(), query);
if (newCount != result) {
result = newCount;
wsService.sendWsMsg(sessionRef.getSessionId(), new EntityCountUpdate(cmdId, result));
sendWsMsg(new EntityCountUpdate(cmdId, result));
}
}

View File

@ -51,7 +51,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
@Getter
@Setter
private boolean initialDataSent;
private volatile boolean initialDataSent;
private TimeSeriesCmd curTsCmd;
private LatestValueCmd latestValueCmd;
@Getter
@ -121,7 +121,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
if (!latestUpdate.isEmpty()) {
Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(keyType, latestUpdate);
entityData = new EntityData(entityId, latestMap, null);
wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData), maxEntitiesPerDataSubscription));
sendWsMsg(new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData), maxEntitiesPerDataSubscription));
}
}
@ -162,7 +162,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
Map<String, TsValue[]> tsMap = new HashMap<>();
tsUpdate.forEach((key, tsValue) -> tsMap.put(key, tsValue.toArray(new TsValue[tsValue.size()])));
EntityData entityData = new EntityData(entityId, null, tsMap);
wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData), maxEntitiesPerDataSubscription));
sendWsMsg(new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData), maxEntitiesPerDataSubscription));
}
}
@ -219,9 +219,9 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
}
}
}
wsService.sendWsMsg(sessionRef.getSessionId(), new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription));
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId));
subsToAdd.forEach(localSubscriptionService::addSubscription);
sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription));
}
public void setCurrentCmd(EntityDataCmd cmd) {

View File

@ -442,7 +442,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
private void handleWsAttributesSubscriptionByKeys(TelemetryWebSocketSessionRef sessionRef,
AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId,
List<String> keys) {
FutureCallback<List<AttributeKvEntry>> callback = new FutureCallback<List<AttributeKvEntry>>() {
FutureCallback<List<AttributeKvEntry>> callback = new FutureCallback<>() {
@Override
public void onSuccess(List<AttributeKvEntry> data) {
List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
@ -542,7 +542,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
private void handleWsAttributesSubscription(TelemetryWebSocketSessionRef sessionRef,
AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
FutureCallback<List<AttributeKvEntry>> callback = new FutureCallback<List<AttributeKvEntry>>() {
FutureCallback<List<AttributeKvEntry>> callback = new FutureCallback<>() {
@Override
public void onSuccess(List<AttributeKvEntry> data) {
List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
@ -666,7 +666,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
private FutureCallback<List<TsKvEntry>> getSubscriptionCallback(final TelemetryWebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final EntityId entityId, final long startTs, final List<String> keys) {
return new FutureCallback<List<TsKvEntry>>() {
return new FutureCallback<>() {
@Override
public void onSuccess(List<TsKvEntry> data) {
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));

View File

@ -102,7 +102,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
List<TsKvEntry> tsData = Arrays.asList(dataPoint1, dataPoint2, dataPoint3);
sendTelemetry(device, tsData);
Thread.sleep(100);
update = getWsClient().sendHistoryCmd(keys, now, TimeUnit.HOURS.toMillis(1), dtf);
@ -136,7 +135,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
List<TsKvEntry> tsData = Arrays.asList(dataPoint1, dataPoint2, dataPoint3);
sendTelemetry(device, tsData);
Thread.sleep(100);
update = getWsClient().subscribeTsUpdate(List.of("temperature"), now, TimeUnit.HOURS.toMillis(1));
Assert.assertEquals(1, update.getCmdId());
@ -153,7 +151,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
now = System.currentTimeMillis();
TsKvEntry dataPoint4 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 45L));
getWsClient().registerWaitForUpdate();
Thread.sleep(100);
sendTelemetry(device, Arrays.asList(dataPoint4));
String msg = getWsClient().waitForUpdate();
@ -309,13 +306,12 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature").getTs());
Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature").getValue());
getWsClient().registerWaitForUpdate();
TsKvEntry dataPoint1 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("temperature", 42L));
List<TsKvEntry> tsData = Arrays.asList(dataPoint1);
sendTelemetry(device, tsData);
Thread.sleep(100);
update = getWsClient().subscribeLatestUpdate(keys, dtf);
update = getWsClient().parseDataReply(getWsClient().waitForUpdate());
Assert.assertEquals(1, update.getCmdId());
@ -329,7 +325,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
now = System.currentTimeMillis();
TsKvEntry dataPoint2 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 52L));
getWsClient().registerWaitForUpdate();
sendTelemetry(device, Arrays.asList(dataPoint2));
update = getWsClient().parseDataReply(getWsClient().waitForUpdate());
@ -371,7 +366,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getValue());
getWsClient().registerWaitForUpdate();
Thread.sleep(500);
AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("serverAttributeKey", 42L));
List<AttributeKvEntry> tsData = Arrays.asList(dataPoint1);
@ -394,7 +388,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
AttributeKvEntry dataPoint2 = new BaseAttributeKvEntry(now, new LongDataEntry("serverAttributeKey", 52L));
getWsClient().registerWaitForUpdate();
Thread.sleep(500);
sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint2));
msg = getWsClient().waitForUpdate();
Assert.assertNotNull(msg);
@ -411,14 +404,12 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
//Sending update from the past, while latest value has new timestamp;
getWsClient().registerWaitForUpdate();
Thread.sleep(500);
sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint1));
msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1));
Assert.assertNull(msg);
//Sending duplicate update again
getWsClient().registerWaitForUpdate();
Thread.sleep(500);
sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint2));
msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1));
Assert.assertNull(msg);
@ -456,7 +447,6 @@ public abstract class BaseWebsocketApiTest extends AbstractControllerTest {
getWsClient().registerWaitForUpdate();
AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("serverAttributeKey", 42L));
List<AttributeKvEntry> tsData = Arrays.asList(dataPoint1);
Thread.sleep(100);
sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, tsData);

View File

@ -44,8 +44,8 @@ import java.util.concurrent.TimeUnit;
public class TbTestWebSocketClient extends WebSocketClient {
private volatile String lastMsg;
private CountDownLatch reply;
private CountDownLatch update;
private volatile CountDownLatch reply;
private volatile CountDownLatch update;
public TbTestWebSocketClient(URI serverUri) {
super(serverUri);