Mapping of cmdId to unique sequence per subscription to fix race condition with entity data query commands

This commit is contained in:
Andrii Shvaika 2024-02-15 14:45:38 +02:00
parent 841f9223c0
commit c5b74ff932
7 changed files with 60 additions and 27 deletions

View File

@ -36,8 +36,6 @@ public abstract class TbSubscription<T> {
private final TbSubscriptionType type; private final TbSubscriptionType type;
private final BiConsumer<TbSubscription<T>, T> updateProcessor; private final BiConsumer<TbSubscription<T>, T> updateProcessor;
protected final AtomicInteger sequence = new AtomicInteger();
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;

View File

@ -259,13 +259,13 @@ public class DefaultWebSocketService implements WebSocketService {
} }
@Override @Override
public void sendUpdate(String sessionId, TelemetrySubscriptionUpdate update) { public void sendUpdate(String sessionId, int cmdId, TelemetrySubscriptionUpdate update) {
sendUpdate(sessionId, update.getSubscriptionId(), update); doSendUpdate(sessionId, cmdId, update);
} }
@Override @Override
public void sendUpdate(String sessionId, CmdUpdate update) { public void sendUpdate(String sessionId, CmdUpdate update) {
sendUpdate(sessionId, update.getCmdId(), update); doSendUpdate(sessionId, update.getCmdId(), update);
} }
@Override @Override
@ -274,7 +274,7 @@ public class DefaultWebSocketService implements WebSocketService {
sendUpdate(sessionRef, update); sendUpdate(sessionRef, update);
} }
private <T> void sendUpdate(String sessionId, int cmdId, T update) { private <T> void doSendUpdate(String sessionId, int cmdId, T update) {
WsSessionMetaData md = wsSessionsMap.get(sessionId); WsSessionMetaData md = wsSessionsMap.get(sessionId);
if (md != null) { if (md != null) {
sendUpdate(md.getSessionRef(), cmdId, update); sendUpdate(md.getSessionRef(), cmdId, update);
@ -288,7 +288,7 @@ public class DefaultWebSocketService implements WebSocketService {
try { try {
msgEndpoint.close(md.getSessionRef(), status); msgEndpoint.close(md.getSessionRef(), status);
} catch (IOException e) { } catch (IOException e) {
log.warn("[{}] Failed to send session close: {}", sessionId, e); log.warn("[{}] Failed to send session close", sessionId, e);
} }
} }
} }
@ -439,7 +439,7 @@ public class DefaultWebSocketService implements WebSocketService {
TbAttributeSubscription sub = TbAttributeSubscription.builder() TbAttributeSubscription sub = TbAttributeSubscription.builder()
.serviceId(serviceId) .serviceId(serviceId)
.sessionId(sessionId) .sessionId(sessionId)
.subscriptionId(cmd.getCmdId()) .subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet())
.tenantId(sessionRef.getSecurityCtx().getTenantId()) .tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId) .entityId(entityId)
.queryTs(queryTs) .queryTs(queryTs)
@ -449,7 +449,7 @@ public class DefaultWebSocketService implements WebSocketService {
.updateProcessor((subscription, update) -> { .updateProcessor((subscription, update) -> {
subLock.lock(); subLock.lock();
try { try {
sendUpdate(subscription.getSessionId(), update); sendUpdate(subscription.getSessionId(), cmd.getCmdId(), update);
} finally { } finally {
subLock.unlock(); subLock.unlock();
} }
@ -545,7 +545,7 @@ public class DefaultWebSocketService implements WebSocketService {
TbAttributeSubscription sub = TbAttributeSubscription.builder() TbAttributeSubscription sub = TbAttributeSubscription.builder()
.serviceId(serviceId) .serviceId(serviceId)
.sessionId(sessionId) .sessionId(sessionId)
.subscriptionId(cmd.getCmdId()) .subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet())
.tenantId(sessionRef.getSecurityCtx().getTenantId()) .tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId) .entityId(entityId)
.queryTs(queryTs) .queryTs(queryTs)
@ -554,7 +554,7 @@ public class DefaultWebSocketService implements WebSocketService {
.updateProcessor((subscription, update) -> { .updateProcessor((subscription, update) -> {
subLock.lock(); subLock.lock();
try { try {
sendUpdate(subscription.getSessionId(), update); sendUpdate(subscription.getSessionId(), cmd.getCmdId(), update);
} finally { } finally {
subLock.unlock(); subLock.unlock();
} }
@ -643,13 +643,13 @@ public class DefaultWebSocketService implements WebSocketService {
TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder() TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder()
.serviceId(serviceId) .serviceId(serviceId)
.sessionId(sessionId) .sessionId(sessionId)
.subscriptionId(cmd.getCmdId()) .subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet())
.tenantId(sessionRef.getSecurityCtx().getTenantId()) .tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId) .entityId(entityId)
.updateProcessor((subscription, update) -> { .updateProcessor((subscription, update) -> {
subLock.lock(); subLock.lock();
try { try {
sendUpdate(subscription.getSessionId(), update); sendUpdate(subscription.getSessionId(), cmd.getCmdId(), update);
} finally { } finally {
subLock.unlock(); subLock.unlock();
} }
@ -698,13 +698,13 @@ public class DefaultWebSocketService implements WebSocketService {
TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder() TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder()
.serviceId(serviceId) .serviceId(serviceId)
.sessionId(sessionId) .sessionId(sessionId)
.subscriptionId(cmd.getCmdId()) .subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet())
.tenantId(sessionRef.getSecurityCtx().getTenantId()) .tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId) .entityId(entityId)
.updateProcessor((subscription, update) -> { .updateProcessor((subscription, update) -> {
subLock.lock(); subLock.lock();
try { try {
sendUpdate(subscription.getSessionId(), update); sendUpdate(subscription.getSessionId(), cmd.getCmdId(), update);
} finally { } finally {
subLock.unlock(); subLock.unlock();
} }
@ -836,7 +836,7 @@ public class DefaultWebSocketService implements WebSocketService {
try { try {
msgEndpoint.sendPing(md.getSessionRef(), currentTime); msgEndpoint.sendPing(md.getSessionRef(), currentTime);
} catch (IOException e) { } catch (IOException e) {
log.warn("[{}] Failed to send ping: {}", md.getSessionRef().getSessionId(), e); log.warn("[{}] Failed to send ping:", md.getSessionRef().getSessionId(), e);
} }
})); }));
} }

View File

@ -29,7 +29,7 @@ public interface WebSocketService {
void handleCommands(WebSocketSessionRef sessionRef, WsCommandsWrapper commandsWrapper); void handleCommands(WebSocketSessionRef sessionRef, WsCommandsWrapper commandsWrapper);
void sendUpdate(String sessionId, TelemetrySubscriptionUpdate update); void sendUpdate(String sessionId, int cmdId, TelemetrySubscriptionUpdate update);
void sendUpdate(String sessionId, CmdUpdate update); void sendUpdate(String sessionId, CmdUpdate update);

View File

@ -115,7 +115,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
private void fetchUnreadNotificationsCount(NotificationsCountSubscription subscription) { private void fetchUnreadNotificationsCount(NotificationsCountSubscription subscription) {
log.trace("[{}, subId: {}] Fetching unread notifications count from DB", subscription.getSessionId(), subscription.getSubscriptionId()); log.trace("[{}, subId: {}] Fetching unread notifications count from DB", subscription.getSessionId(), subscription.getSubscriptionId());
int unreadCount = notificationService.countUnreadNotificationsByRecipientId(subscription.getTenantId(), (UserId) subscription.getEntityId()); int unreadCount = notificationService.countUnreadNotificationsByRecipientId(subscription.getTenantId(), (UserId) subscription.getEntityId());
subscription.getUnreadCounter().set(unreadCount); subscription.getTotalUnreadCounter().set(unreadCount);
} }
@ -196,20 +196,20 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
private void handleNotificationUpdate(NotificationsCountSubscription subscription, NotificationUpdate update) { private void handleNotificationUpdate(NotificationsCountSubscription subscription, NotificationUpdate update) {
log.trace("[{}, subId: {}] Handling notification update for count sub: {}", subscription.getSessionId(), subscription.getSubscriptionId(), update); log.trace("[{}, subId: {}] Handling notification update for count sub: {}", subscription.getSessionId(), subscription.getSubscriptionId(), update);
if (update.isCreated()) { if (update.isCreated()) {
subscription.getUnreadCounter().incrementAndGet(); subscription.getTotalUnreadCounter().incrementAndGet();
sendUpdate(subscription.getSessionId(), subscription.createUpdate()); sendUpdate(subscription.getSessionId(), subscription.createUpdate());
} else if (update.isUpdated()) { } else if (update.isUpdated()) {
if (update.getNewStatus() == NotificationStatus.READ) { if (update.getNewStatus() == NotificationStatus.READ) {
if (update.isAllNotifications()) { if (update.isAllNotifications()) {
fetchUnreadNotificationsCount(subscription); fetchUnreadNotificationsCount(subscription);
} else { } else {
subscription.getUnreadCounter().decrementAndGet(); subscription.getTotalUnreadCounter().decrementAndGet();
} }
sendUpdate(subscription.getSessionId(), subscription.createUpdate()); sendUpdate(subscription.getSessionId(), subscription.createUpdate());
} }
} else if (update.isDeleted()) { } else if (update.isDeleted()) {
if (update.getNotification().getStatus() != NotificationStatus.READ) { if (update.getNotification().getStatus() != NotificationStatus.READ) {
subscription.getUnreadCounter().decrementAndGet(); subscription.getTotalUnreadCounter().decrementAndGet();
sendUpdate(subscription.getSessionId(), subscription.createUpdate()); sendUpdate(subscription.getSessionId(), subscription.createUpdate());
} }
} }

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ws.notification.sub;
import lombok.Getter;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.service.subscription.TbSubscription;
import org.thingsboard.server.service.subscription.TbSubscriptionType;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
@Getter
public abstract class AbstractNotificationSubscription<T> extends TbSubscription<T> {
protected final AtomicInteger sequence = new AtomicInteger();
protected final AtomicInteger totalUnreadCounter = new AtomicInteger();
public AbstractNotificationSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId, TbSubscriptionType type, BiConsumer<TbSubscription<T>, T> updateProcessor) {
super(serviceId, sessionId, subscriptionId, tenantId, entityId, type, updateProcessor);
}
}

View File

@ -27,9 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
@Getter @Getter
public class NotificationsCountSubscription extends TbSubscription<NotificationsSubscriptionUpdate> { public class NotificationsCountSubscription extends AbstractNotificationSubscription<NotificationsSubscriptionUpdate> {
private final AtomicInteger unreadCounter = new AtomicInteger();
@Builder @Builder
public NotificationsCountSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId, public NotificationsCountSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId,
@ -40,7 +38,7 @@ public class NotificationsCountSubscription extends TbSubscription<Notifications
public UnreadNotificationsCountUpdate createUpdate() { public UnreadNotificationsCountUpdate createUpdate() {
return UnreadNotificationsCountUpdate.builder() return UnreadNotificationsCountUpdate.builder()
.cmdId(getSubscriptionId()) .cmdId(getSubscriptionId())
.totalUnreadCount(unreadCounter.get()) .totalUnreadCount(totalUnreadCounter.get())
.sequenceNumber(sequence.incrementAndGet()) .sequenceNumber(sequence.incrementAndGet())
.build(); .build();
} }

View File

@ -35,11 +35,10 @@ import java.util.function.BiConsumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Getter @Getter
public class NotificationsSubscription extends TbSubscription<NotificationsSubscriptionUpdate> { public class NotificationsSubscription extends AbstractNotificationSubscription<NotificationsSubscriptionUpdate> {
private final Map<UUID, Notification> latestUnreadNotifications = new HashMap<>(); private final Map<UUID, Notification> latestUnreadNotifications = new HashMap<>();
private final int limit; private final int limit;
private final AtomicInteger totalUnreadCounter = new AtomicInteger();
@Builder @Builder
public NotificationsSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId, public NotificationsSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId,