WS session updates: remove 'synchronized' and use binary semaphore

This commit is contained in:
ViacheslavKlimov 2022-11-10 15:49:02 +02:00
parent 6e8dba6e2c
commit 80ca9cc787

View File

@ -52,13 +52,11 @@ import javax.websocket.Session;
import java.io.IOException;
import java.net.URI;
import java.security.InvalidParameterException;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Semaphore;
import static org.thingsboard.server.service.ws.DefaultWebSocketService.NUMBER_OF_PING_ATTEMPTS;
@ -139,9 +137,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
if (!checkLimits(session, sessionRef)) {
return;
}
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, tenantProfileConfiguration.getWsMsgQueueLimitPerSession() > 0 ?
tenantProfileConfiguration.getWsMsgQueueLimitPerSession() : 500));
internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef));
externalSessionMap.put(externalSessionId, internalSessionId);
processInWebSocketService(sessionRef, SessionEvent.onEstablished());
@ -216,22 +212,21 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
private final RemoteEndpoint.Async asyncRemote;
private final WebSocketSessionRef sessionRef;
private final AtomicBoolean isSending = new AtomicBoolean(false);
private final Queue<TbWebSocketMsg<?>> msgQueue;
// TODO: carefully review ( + discuss removal of the msgQueue)
private final Semaphore sendingSemaphore = new Semaphore(1);
private volatile long lastActivityTime;
SessionMetaData(WebSocketSession session, WebSocketSessionRef sessionRef, int maxMsgQueuePerSession) {
SessionMetaData(WebSocketSession session, WebSocketSessionRef sessionRef) {
super();
this.session = session;
Session nativeSession = ((NativeWebSocketSession) session).getNativeSession(Session.class);
this.asyncRemote = nativeSession.getAsyncRemote();
this.sessionRef = sessionRef;
this.msgQueue = new LinkedBlockingQueue<>(maxMsgQueuePerSession);
this.lastActivityTime = System.currentTimeMillis();
}
synchronized void sendPing(long currentTime) {
void sendPing(long currentTime) {
try {
long timeSinceLastActivity = currentTime - lastActivityTime;
if (timeSinceLastActivity >= pingTimeout) {
@ -246,40 +241,37 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
}
}
private void closeSession(CloseStatus reason) {
void closeSession(CloseStatus reason) {
try {
close(this.sessionRef, reason);
} catch (IOException ioe) {
log.trace("[{}] Session transport error", session.getId(), ioe);
} finally {
sendingSemaphore.release();
}
}
synchronized void processPongMessage(long currentTime) {
void processPongMessage(long currentTime) {
lastActivityTime = currentTime;
}
synchronized void sendMsg(String msg) {
void sendMsg(String msg) {
sendMsg(new TbWebSocketTextMsg(msg));
}
synchronized void sendMsg(TbWebSocketMsg<?> msg) {
if (isSending.compareAndSet(false, true)) {
sendMsgInternal(msg);
} else {
void sendMsg(TbWebSocketMsg<?> msg) {
try {
msgQueue.add(msg);
} catch (RuntimeException e) {
if (log.isTraceEnabled()) {
log.trace("[{}][{}] Session closed due to queue error", sessionRef.getSecurityCtx().getTenantId(), session.getId(), e);
} else {
log.info("[{}][{}] Session closed due to queue error", sessionRef.getSecurityCtx().getTenantId(), session.getId());
}
closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!"));
}
sendingSemaphore.acquire();
sendMsgInternal(msg);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (Exception e) {
sendingSemaphore.release();
throw e;
}
}
private void sendMsgInternal(TbWebSocketMsg<?> msg) {
void sendMsgInternal(TbWebSocketMsg<?> msg) {
try {
if (TbWebSocketMsgType.TEXT.equals(msg.getType())) {
TbWebSocketTextMsg textMsg = (TbWebSocketTextMsg) msg;
@ -287,7 +279,6 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
} else {
TbWebSocketPingMsg pingMsg = (TbWebSocketPingMsg) msg;
this.asyncRemote.sendPing(pingMsg.getMsg());
processNextMsg();
}
} catch (Exception e) {
log.trace("[{}] Failed to send msg", session.getId(), e);
@ -301,16 +292,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
log.trace("[{}] Failed to send msg", session.getId(), result.getException());
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
} else {
processNextMsg();
}
}
private void processNextMsg() {
TbWebSocketMsg<?> msg = msgQueue.poll();
if (msg != null) {
sendMsgInternal(msg);
} else {
isSending.set(false);
sendingSemaphore.release();
}
}
}