cmdsHandlers structure refactoring

This commit is contained in:
ViacheslavKlimov 2023-12-11 16:11:09 +02:00
parent 131f6bf721
commit 4ff0813b07
2 changed files with 23 additions and 32 deletions

View File

@ -195,6 +195,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
sessionRef.setSecurityCtx(securityCtx); sessionRef.setSecurityCtx(securityCtx);
pendingSessions.invalidate(sessionMd.session.getId()); pendingSessions.invalidate(sessionMd.session.getId());
establishSession(sessionMd.session, sessionRef, sessionMd); establishSession(sessionMd.session, sessionRef, sessionMd);
webSocketService.handleCommands(sessionRef, cmdsWrapper); webSocketService.handleCommands(sessionRef, cmdsWrapper);
} }
} }

View File

@ -86,6 +86,7 @@ import javax.annotation.PreDestroy;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -147,7 +148,7 @@ public class DefaultWebSocketService implements WebSocketService {
private ScheduledExecutorService pingExecutor; private ScheduledExecutorService pingExecutor;
private String serviceId; private String serviceId;
private List<WsCmdHandler<? extends WsCmd>> cmdsHandlers; private Map<WsCmdType, WsCmdHandler<? extends WsCmd>> cmdsHandlers;
@PostConstruct @PostConstruct
public void init() { public void init() {
@ -157,24 +158,23 @@ public class DefaultWebSocketService implements WebSocketService {
pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping")); pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping"));
pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS); pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS);
cmdsHandlers = List.of( cmdsHandlers = new EnumMap<>(WsCmdType.class);
newCmdHandler(WsCmdType.ATTRIBUTES, this::handleWsAttributesSubscriptionCmd), cmdsHandlers.put(WsCmdType.ATTRIBUTES, newCmdHandler(this::handleWsAttributesSubscriptionCmd));
newCmdHandler(WsCmdType.TIMESERIES, this::handleWsTimeseriesSubscriptionCmd), cmdsHandlers.put(WsCmdType.TIMESERIES, newCmdHandler(this::handleWsTimeseriesSubscriptionCmd));
newCmdHandler(WsCmdType.TIMESERIES_HISTORY, this::handleWsHistoryCmd), cmdsHandlers.put(WsCmdType.TIMESERIES_HISTORY, newCmdHandler(this::handleWsHistoryCmd));
newCmdHandler(WsCmdType.ENTITY_DATA, this::handleWsEntityDataCmd), cmdsHandlers.put(WsCmdType.ENTITY_DATA, newCmdHandler(this::handleWsEntityDataCmd));
newCmdHandler(WsCmdType.ALARM_DATA, this::handleWsAlarmDataCmd), cmdsHandlers.put(WsCmdType.ALARM_DATA, newCmdHandler(this::handleWsAlarmDataCmd));
newCmdHandler(WsCmdType.ENTITY_COUNT, this::handleWsEntityCountCmd), cmdsHandlers.put(WsCmdType.ENTITY_COUNT, newCmdHandler(this::handleWsEntityCountCmd));
newCmdHandler(WsCmdType.ALARM_COUNT, this::handleWsAlarmCountCmd), cmdsHandlers.put(WsCmdType.ALARM_COUNT, newCmdHandler(this::handleWsAlarmCountCmd));
newCmdHandler(WsCmdType.ENTITY_DATA_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd), cmdsHandlers.put(WsCmdType.ENTITY_DATA_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
newCmdHandler(WsCmdType.ALARM_DATA_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd), cmdsHandlers.put(WsCmdType.ALARM_DATA_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
newCmdHandler(WsCmdType.ENTITY_COUNT_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd), cmdsHandlers.put(WsCmdType.ENTITY_COUNT_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
newCmdHandler(WsCmdType.ALARM_COUNT_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd), cmdsHandlers.put(WsCmdType.ALARM_COUNT_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
newCmdHandler(WsCmdType.NOTIFICATIONS, notificationCmdsHandler::handleUnreadNotificationsSubCmd), cmdsHandlers.put(WsCmdType.NOTIFICATIONS, newCmdHandler(notificationCmdsHandler::handleUnreadNotificationsSubCmd));
newCmdHandler(WsCmdType.NOTIFICATIONS_COUNT, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd), cmdsHandlers.put(WsCmdType.NOTIFICATIONS_COUNT, newCmdHandler(notificationCmdsHandler::handleUnreadNotificationsCountSubCmd));
newCmdHandler(WsCmdType.MARK_NOTIFICATIONS_AS_READ, notificationCmdsHandler::handleMarkAsReadCmd), cmdsHandlers.put(WsCmdType.MARK_NOTIFICATIONS_AS_READ, newCmdHandler(notificationCmdsHandler::handleMarkAsReadCmd));
newCmdHandler(WsCmdType.MARK_ALL_NOTIFICATIONS_AS_READ, notificationCmdsHandler::handleMarkAllAsReadCmd), cmdsHandlers.put(WsCmdType.MARK_ALL_NOTIFICATIONS_AS_READ, newCmdHandler(notificationCmdsHandler::handleMarkAllAsReadCmd));
newCmdHandler(WsCmdType.NOTIFICATIONS_UNSUBSCRIBE, notificationCmdsHandler::handleUnsubCmd) cmdsHandlers.put(WsCmdType.NOTIFICATIONS_UNSUBSCRIBE, newCmdHandler(notificationCmdsHandler::handleUnsubCmd));
);
} }
@PreDestroy @PreDestroy
@ -221,7 +221,7 @@ public class DefaultWebSocketService implements WebSocketService {
for (WsCmd cmd : commandsWrapper.getCmds()) { for (WsCmd cmd : commandsWrapper.getCmds()) {
log.debug("[{}][{}][{}] Processing cmd: {}", sessionId, cmd.getType(), cmd.getCmdId(), cmd); log.debug("[{}][{}][{}] Processing cmd: {}", sessionId, cmd.getType(), cmd.getCmdId(), cmd);
try { try {
Optional.ofNullable(getCmdHandler(cmd.getType())) Optional.ofNullable(cmdsHandlers.get(cmd.getType()))
.ifPresent(cmdHandler -> cmdHandler.handle(sessionRef, cmd)); .ifPresent(cmdHandler -> cmdHandler.handle(sessionRef, cmd));
} catch (Exception e) { } catch (Exception e) {
log.error("[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}", sessionId, log.error("[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}", sessionId,
@ -963,24 +963,14 @@ public class DefaultWebSocketService implements WebSocketService {
.map(TenantProfile::getDefaultProfileConfiguration).orElse(null); .map(TenantProfile::getDefaultProfileConfiguration).orElse(null);
} }
public WsCmdHandler<? extends WsCmd> getCmdHandler(WsCmdType cmdType) { public static <C extends WsCmd> WsCmdHandler<C> newCmdHandler(BiConsumer<WebSocketSessionRef, C> handler) {
for (WsCmdHandler<? extends WsCmd> cmdHandler : cmdsHandlers) { return new WsCmdHandler<>(handler);
if (cmdHandler.getCmdType() == cmdType) {
return cmdHandler;
}
}
return null;
}
public static <C extends WsCmd> WsCmdHandler<C> newCmdHandler(WsCmdType cmdType, BiConsumer<WebSocketSessionRef, C> handler) {
return new WsCmdHandler<>(cmdType, handler);
} }
@RequiredArgsConstructor @RequiredArgsConstructor
@Getter @Getter
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static class WsCmdHandler<C extends WsCmd> { public static class WsCmdHandler<C extends WsCmd> {
private final WsCmdType cmdType;
protected final BiConsumer<WebSocketSessionRef, C> handler; protected final BiConsumer<WebSocketSessionRef, C> handler;
public void handle(WebSocketSessionRef sessionRef, WsCmd cmd) { public void handle(WebSocketSessionRef sessionRef, WsCmd cmd) {