From 26cd9b226d7f934cc901ef80114a88bd6f9fa920 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 29 Nov 2023 13:44:58 +0200 Subject: [PATCH 1/4] Single validateSessionMetadata on WS cmds --- .../server/service/ws/DefaultWebSocketService.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 47478b3fdb..895699756a 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -259,6 +259,8 @@ public class DefaultWebSocketService implements WebSocketService { log.error("[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}", sessionId, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), cmd, e); } + } else { + return; } } } @@ -850,10 +852,6 @@ public class DefaultWebSocketService implements WebSocketService { return true; } - private boolean validateSessionMetadata(WebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) { - return validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId); - } - private boolean validateSessionMetadata(WebSocketSessionRef sessionRef, int cmdId, String sessionId) { WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId); if (sessionMD == null) { From 9e9c63fea2845847ad8cc88e63b97bd4337f03df Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 4 Dec 2023 15:02:45 +0200 Subject: [PATCH 2/4] WS api refactoring --- .../service/ws/DefaultWebSocketService.java | 51 +++++++++---------- .../cmd/NotificationCmdsWrapper.java | 20 ++++---- ...mdsWrapper.java => WsCommandsWrapper.java} | 12 ++--- .../controller/TbTestWebSocketClient.java | 10 ++-- .../notification/NotificationApiWsClient.java | 20 ++++---- .../lwm2m/AbstractLwM2MIntegrationTest.java | 4 +- 6 files changed, 57 insertions(+), 60 deletions(-) rename application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/{WsCmdsWrapper.java => WsCommandsWrapper.java} (87%) diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 895699756a..85f8bd7207 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -66,7 +66,7 @@ import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription; import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler; import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper; import org.thingsboard.server.service.ws.notification.cmd.WsCmd; -import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; +import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.SubscriptionCmd; @@ -160,22 +160,22 @@ public class DefaultWebSocketService implements WebSocketService { pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS); cmdsHandlers = List.of( - newCmdsHandler(WsCmdsWrapper::getAttrSubCmds, this::handleWsAttributesSubscriptionCmd), - newCmdsHandler(WsCmdsWrapper::getTsSubCmds, this::handleWsTimeseriesSubscriptionCmd), - newCmdsHandler(WsCmdsWrapper::getHistoryCmds, this::handleWsHistoryCmd), - newCmdsHandler(WsCmdsWrapper::getEntityDataCmds, this::handleWsEntityDataCmd), - newCmdsHandler(WsCmdsWrapper::getAlarmDataCmds, this::handleWsAlarmDataCmd), - newCmdsHandler(WsCmdsWrapper::getEntityCountCmds, this::handleWsEntityCountCmd), - newCmdsHandler(WsCmdsWrapper::getAlarmCountCmds, this::handleWsAlarmCountCmd), - newCmdsHandler(WsCmdsWrapper::getEntityDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), - newCmdsHandler(WsCmdsWrapper::getAlarmDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), - newCmdsHandler(WsCmdsWrapper::getEntityCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), - newCmdsHandler(WsCmdsWrapper::getAlarmCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), - newCmdHandler(WsCmdsWrapper::getUnreadNotificationsSubCmd, notificationCmdsHandler::handleUnreadNotificationsSubCmd), - newCmdHandler(WsCmdsWrapper::getUnreadNotificationsCountSubCmd, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd), - newCmdHandler(WsCmdsWrapper::getMarkNotificationAsReadCmd, notificationCmdsHandler::handleMarkAsReadCmd), - newCmdHandler(WsCmdsWrapper::getMarkAllNotificationsAsReadCmd, notificationCmdsHandler::handleMarkAllAsReadCmd), - newCmdHandler(WsCmdsWrapper::getNotificationsUnsubCmd, notificationCmdsHandler::handleUnsubCmd) + newCmdsHandler(WsCommandsWrapper::getAttrSubCmds, this::handleWsAttributesSubscriptionCmd), + newCmdsHandler(WsCommandsWrapper::getTsSubCmds, this::handleWsTimeseriesSubscriptionCmd), + newCmdsHandler(WsCommandsWrapper::getHistoryCmds, this::handleWsHistoryCmd), + newCmdsHandler(WsCommandsWrapper::getEntityDataCmds, this::handleWsEntityDataCmd), + newCmdsHandler(WsCommandsWrapper::getAlarmDataCmds, this::handleWsAlarmDataCmd), + newCmdsHandler(WsCommandsWrapper::getEntityCountCmds, this::handleWsEntityCountCmd), + newCmdsHandler(WsCommandsWrapper::getAlarmCountCmds, this::handleWsAlarmCountCmd), + newCmdsHandler(WsCommandsWrapper::getEntityDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), + newCmdsHandler(WsCommandsWrapper::getAlarmDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), + newCmdsHandler(WsCommandsWrapper::getEntityCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), + newCmdsHandler(WsCommandsWrapper::getAlarmCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), + newCmdsHandler(WsCommandsWrapper::getUnreadNotificationsSubCmds, notificationCmdsHandler::handleUnreadNotificationsSubCmd), + newCmdsHandler(WsCommandsWrapper::getUnreadNotificationsCountSubCmds, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd), + newCmdsHandler(WsCommandsWrapper::getMarkNotificationAsReadCmds, notificationCmdsHandler::handleMarkAsReadCmd), + newCmdsHandler(WsCommandsWrapper::getMarkAllNotificationsAsReadCmds, notificationCmdsHandler::handleMarkAllAsReadCmd), + newCmdsHandler(WsCommandsWrapper::getNotificationsUnsubCmds, notificationCmdsHandler::handleUnsubCmd) ); } @@ -232,7 +232,7 @@ public class DefaultWebSocketService implements WebSocketService { } private void processCmds(WebSocketSessionRef sessionRef, String msg) throws JsonProcessingException { - WsCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, WsCmdsWrapper.class); + WsCommandsWrapper cmdsWrapper = JacksonUtil.fromString(msg, WsCommandsWrapper.class); processCmds(sessionRef, cmdsWrapper); } @@ -241,7 +241,7 @@ public class DefaultWebSocketService implements WebSocketService { processCmds(sessionRef, cmdsWrapper.toCommonCmdsWrapper()); } - private void processCmds(WebSocketSessionRef sessionRef, WsCmdsWrapper cmdsWrapper) { + private void processCmds(WebSocketSessionRef sessionRef, WsCommandsWrapper cmdsWrapper) { if (cmdsWrapper == null) { return; } @@ -1033,22 +1033,17 @@ public class DefaultWebSocketService implements WebSocketService { } - public static WsCmdHandler newCmdHandler(java.util.function.Function cmdExtractor, - BiConsumer handler) { - return new WsCmdHandler<>(cmdExtractor, handler); - } - - public static WsCmdsHandler newCmdsHandler(java.util.function.Function> cmdsExtractor, + public static WsCmdsHandler newCmdsHandler(java.util.function.Function> cmdsExtractor, BiConsumer handler) { return new WsCmdsHandler<>(cmdsExtractor, handler); } @RequiredArgsConstructor public static class WsCmdsHandler { - private final java.util.function.Function> cmdsExtractor; + private final java.util.function.Function> cmdsExtractor; protected final BiConsumer handler; - public List extract(WsCmdsWrapper cmdsWrapper) { + public List extract(WsCommandsWrapper cmdsWrapper) { return cmdsExtractor.apply(cmdsWrapper); } @@ -1059,7 +1054,7 @@ public class DefaultWebSocketService implements WebSocketService { } public static class WsCmdHandler extends WsCmdsHandler { - public WsCmdHandler(java.util.function.Function cmdExtractor, BiConsumer handler) { + public WsCmdHandler(java.util.function.Function cmdExtractor, BiConsumer handler) { super(cmdsWrapper -> { C cmd = cmdExtractor.apply(cmdsWrapper); return cmd != null ? List.of(cmd) : null; diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java index de7a6adab7..664cfcef5b 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java @@ -17,10 +17,12 @@ package org.thingsboard.server.service.ws.notification.cmd; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; -import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; +import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper; + +import java.util.List; /** - * @deprecated Use {@link org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper}. This class is left for backward compatibility + * @deprecated Use {@link WsCommandsWrapper}. This class is left for backward compatibility * */ @Data @Deprecated @@ -37,13 +39,13 @@ public class NotificationCmdsWrapper { private NotificationsUnsubCmd unsubCmd; @JsonIgnore - public WsCmdsWrapper toCommonCmdsWrapper() { - WsCmdsWrapper wrapper = new WsCmdsWrapper(); - wrapper.setUnreadNotificationsCountSubCmd(unreadCountSubCmd); - wrapper.setUnreadNotificationsSubCmd(unreadSubCmd); - wrapper.setMarkNotificationAsReadCmd(markAsReadCmd); - wrapper.setMarkAllNotificationsAsReadCmd(markAllAsReadCmd); - wrapper.setNotificationsUnsubCmd(unsubCmd); + public WsCommandsWrapper toCommonCmdsWrapper() { + WsCommandsWrapper wrapper = new WsCommandsWrapper(); + wrapper.setUnreadNotificationsCountSubCmds(List.of(unreadCountSubCmd)); + wrapper.setUnreadNotificationsSubCmds(List.of(unreadSubCmd)); + wrapper.setMarkNotificationAsReadCmds(List.of(markAsReadCmd)); + wrapper.setMarkAllNotificationsAsReadCmds(List.of(markAllAsReadCmd)); + wrapper.setNotificationsUnsubCmds(List.of(unsubCmd)); return wrapper; } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/WsCmdsWrapper.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/WsCommandsWrapper.java similarity index 87% rename from application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/WsCmdsWrapper.java rename to application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/WsCommandsWrapper.java index 40eb672030..34d2acfbb9 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/WsCmdsWrapper.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/WsCommandsWrapper.java @@ -39,7 +39,7 @@ import java.util.List; * @author Andrew Shvayka */ @Data -public class WsCmdsWrapper { +public class WsCommandsWrapper { private List attrSubCmds; @@ -63,14 +63,14 @@ public class WsCmdsWrapper { private List alarmCountUnsubscribeCmds; - private NotificationsCountSubCmd unreadNotificationsCountSubCmd; + private List unreadNotificationsCountSubCmds; - private NotificationsSubCmd unreadNotificationsSubCmd; + private List unreadNotificationsSubCmds; - private MarkNotificationsAsReadCmd markNotificationAsReadCmd; + private List markNotificationAsReadCmds; - private MarkAllNotificationsAsReadCmd markAllNotificationsAsReadCmd; + private List markAllNotificationsAsReadCmds; - private NotificationsUnsubCmd notificationsUnsubCmd; + private List notificationsUnsubCmds; } diff --git a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java index 43afcfc845..a365a36cf4 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java +++ b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java @@ -28,7 +28,7 @@ import org.thingsboard.server.common.data.query.EntityDataPageLink; import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.data.query.EntityFilter; import org.thingsboard.server.common.data.query.EntityKey; -import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; +import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate; @@ -106,19 +106,19 @@ public class TbTestWebSocketClient extends WebSocketClient { } public void send(EntityDataCmd cmd) throws NotYetConnectedException { - WsCmdsWrapper wrapper = new WsCmdsWrapper(); + WsCommandsWrapper wrapper = new WsCommandsWrapper(); wrapper.setEntityDataCmds(Collections.singletonList(cmd)); this.send(JacksonUtil.toString(wrapper)); } public void send(EntityCountCmd cmd) throws NotYetConnectedException { - WsCmdsWrapper wrapper = new WsCmdsWrapper(); + WsCommandsWrapper wrapper = new WsCommandsWrapper(); wrapper.setEntityCountCmds(Collections.singletonList(cmd)); this.send(JacksonUtil.toString(wrapper)); } public void send(AlarmCountCmd cmd) throws NotYetConnectedException { - WsCmdsWrapper wrapper = new WsCmdsWrapper(); + WsCommandsWrapper wrapper = new WsCommandsWrapper(); wrapper.setAlarmCountCmds(Collections.singletonList(cmd)); this.send(JacksonUtil.toString(wrapper)); } @@ -240,7 +240,7 @@ public class TbTestWebSocketClient extends WebSocketClient { cmd.setEntityId(entityId.getId().toString()); cmd.setScope(scope); cmd.setKeys(String.join(",", keys)); - WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); + WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); cmdsWrapper.setAttrSubCmds(List.of(cmd)); JsonNode msg = JacksonUtil.valueToTree(cmdsWrapper); ((ObjectNode) msg.get("attrSubCmds").get(0)).remove("type"); diff --git a/application/src/test/java/org/thingsboard/server/service/notification/NotificationApiWsClient.java b/application/src/test/java/org/thingsboard/server/service/notification/NotificationApiWsClient.java index 31023fe258..1c4772ebf5 100644 --- a/application/src/test/java/org/thingsboard/server/service/notification/NotificationApiWsClient.java +++ b/application/src/test/java/org/thingsboard/server/service/notification/NotificationApiWsClient.java @@ -28,7 +28,7 @@ import org.thingsboard.server.service.ws.notification.cmd.NotificationsCountSubC import org.thingsboard.server.service.ws.notification.cmd.NotificationsSubCmd; import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsCountUpdate; import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsUpdate; -import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; +import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdateType; import java.net.URI; @@ -54,33 +54,33 @@ public class NotificationApiWsClient extends TbTestWebSocketClient { } public NotificationApiWsClient subscribeForUnreadNotifications(int limit) { - WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); - cmdsWrapper.setUnreadNotificationsSubCmd(new NotificationsSubCmd(1, limit)); + WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); + cmdsWrapper.setUnreadNotificationsSubCmds(List.of(new NotificationsSubCmd(1, limit))); sendCmd(cmdsWrapper); this.limit = limit; return this; } public NotificationApiWsClient subscribeForUnreadNotificationsCount() { - WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); - cmdsWrapper.setUnreadNotificationsCountSubCmd(new NotificationsCountSubCmd(2)); + WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); + cmdsWrapper.setUnreadNotificationsCountSubCmds(List.of(new NotificationsCountSubCmd(2))); sendCmd(cmdsWrapper); return this; } public void markNotificationAsRead(UUID... notifications) { - WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); - cmdsWrapper.setMarkNotificationAsReadCmd(new MarkNotificationsAsReadCmd(newCmdId(), Arrays.asList(notifications))); + WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); + cmdsWrapper.setMarkNotificationAsReadCmds(List.of(new MarkNotificationsAsReadCmd(newCmdId(), Arrays.asList(notifications)))); sendCmd(cmdsWrapper); } public void markAllNotificationsAsRead() { - WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); - cmdsWrapper.setMarkAllNotificationsAsReadCmd(new MarkAllNotificationsAsReadCmd(newCmdId())); + WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); + cmdsWrapper.setMarkAllNotificationsAsReadCmds(List.of(new MarkAllNotificationsAsReadCmd(newCmdId()))); sendCmd(cmdsWrapper); } - public void sendCmd(WsCmdsWrapper cmdsWrapper) { + public void sendCmd(WsCommandsWrapper cmdsWrapper) { String cmd = JacksonUtil.toString(cmdsWrapper); send(cmd); } diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java index 3dd011ac78..94c18f1b73 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java @@ -63,7 +63,7 @@ import org.thingsboard.server.common.data.query.SingleEntityFilter; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.dao.service.DaoSqlTest; -import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; +import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd; @@ -229,7 +229,7 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null); - WsCmdsWrapper wrapper = new WsCmdsWrapper(); + WsCommandsWrapper wrapper = new WsCommandsWrapper(); wrapper.setEntityDataCmds(Collections.singletonList(cmd)); getWsClient().send(JacksonUtil.toString(wrapper)); From 73c7abcf86dc68fee482f8d7ab709bf8446a3e21 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 4 Dec 2023 17:45:27 +0200 Subject: [PATCH 3/4] Fix cmds conversion --- .../notification/cmd/NotificationCmdsWrapper.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java index 664cfcef5b..5161ce8072 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java @@ -41,12 +41,16 @@ public class NotificationCmdsWrapper { @JsonIgnore public WsCommandsWrapper toCommonCmdsWrapper() { WsCommandsWrapper wrapper = new WsCommandsWrapper(); - wrapper.setUnreadNotificationsCountSubCmds(List.of(unreadCountSubCmd)); - wrapper.setUnreadNotificationsSubCmds(List.of(unreadSubCmd)); - wrapper.setMarkNotificationAsReadCmds(List.of(markAsReadCmd)); - wrapper.setMarkAllNotificationsAsReadCmds(List.of(markAllAsReadCmd)); - wrapper.setNotificationsUnsubCmds(List.of(unsubCmd)); + wrapper.setUnreadNotificationsCountSubCmds(toList(unreadCountSubCmd)); + wrapper.setUnreadNotificationsSubCmds(toList(unreadSubCmd)); + wrapper.setMarkNotificationAsReadCmds(toList(markAsReadCmd)); + wrapper.setMarkAllNotificationsAsReadCmds(toList(markAllAsReadCmd)); + wrapper.setNotificationsUnsubCmds(toList(unsubCmd)); return wrapper; } + private List toList(C cmd) { + return cmd != null ? List.of(cmd) : null; + } + } From ca29c79a9f9aad139f3e4fa9a7b7c923d67d3818 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 5 Dec 2023 18:10:29 +0200 Subject: [PATCH 4/4] WS cmds structure refactoring; new WS api endpoint --- .../server/config/WebSocketConfiguration.java | 7 +- .../controller/plugin/TbWebSocketHandler.java | 53 ++-- .../service/ws/DefaultWebSocketService.java | 264 ++++++++---------- .../service/ws/WebSocketSessionType.java | 16 +- .../ws/{notification/cmd => }/WsCmd.java | 10 +- .../server/service/ws/WsCmdType.java | 37 +++ .../server/service/ws/WsCommandsWrapper.java | 69 +++++ .../cmd/MarkAllNotificationsAsReadCmd.java | 7 + .../cmd/MarkNotificationsAsReadCmd.java | 7 + .../cmd/NotificationCmdsWrapper.java | 22 +- .../cmd/NotificationsCountSubCmd.java | 7 + .../notification/cmd/NotificationsSubCmd.java | 7 + .../cmd/NotificationsUnsubCmd.java | 7 + ...Wrapper.java => TelemetryCmdsWrapper.java} | 39 +-- .../cmd/v1/AttributesSubscriptionCmd.java | 6 +- .../ws/telemetry/cmd/v1/GetHistoryCmd.java | 5 + .../ws/telemetry/cmd/v1/SubscriptionCmd.java | 2 - .../telemetry/cmd/v1/TelemetryPluginCmd.java | 2 +- .../cmd/v1/TimeseriesSubscriptionCmd.java | 8 +- .../ws/telemetry/cmd/v2/AlarmCountCmd.java | 6 + .../cmd/v2/AlarmCountUnsubscribeCmd.java | 5 + .../ws/telemetry/cmd/v2/AlarmDataCmd.java | 6 + .../cmd/v2/AlarmDataUnsubscribeCmd.java | 5 + .../service/ws/telemetry/cmd/v2/DataCmd.java | 4 +- .../ws/telemetry/cmd/v2/EntityCountCmd.java | 6 + .../cmd/v2/EntityCountUnsubscribeCmd.java | 5 + .../ws/telemetry/cmd/v2/EntityDataCmd.java | 5 + .../cmd/v2/EntityDataUnsubscribeCmd.java | 5 + .../ws/telemetry/cmd/v2/UnsubscribeCmd.java | 2 +- .../controller/AbstractControllerTest.java | 10 +- .../controller/TbTestWebSocketClient.java | 36 +-- .../server/controller/WebsocketApiTest.java | 7 +- .../notification/NotificationApiWsClient.java | 24 +- .../lwm2m/AbstractLwM2MIntegrationTest.java | 7 +- 34 files changed, 416 insertions(+), 292 deletions(-) rename application/src/main/java/org/thingsboard/server/service/ws/{notification/cmd => }/WsCmd.java (82%) create mode 100644 application/src/main/java/org/thingsboard/server/service/ws/WsCmdType.java create mode 100644 application/src/main/java/org/thingsboard/server/service/ws/WsCommandsWrapper.java rename application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/{WsCommandsWrapper.java => TelemetryCmdsWrapper.java} (70%) diff --git a/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java b/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java index 9a6ffb0e32..e6097ec5d2 100644 --- a/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java +++ b/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java @@ -46,8 +46,9 @@ import java.util.Map; @Slf4j public class WebSocketConfiguration implements WebSocketConfigurer { - public static final String WS_PLUGIN_PREFIX = "/api/ws/plugins/"; - private static final String WS_PLUGIN_MAPPING = WS_PLUGIN_PREFIX + "**"; + public static final String WS_API_ENDPOINT = "/api/ws"; + public static final String WS_PLUGINS_ENDPOINT = "/api/ws/plugins/"; + private static final String WS_API_MAPPING = "/api/ws/**"; private final WebSocketHandler wsHandler; @@ -65,7 +66,7 @@ public class WebSocketConfiguration implements WebSocketConfigurer { log.error("TbWebSocketHandler expected but [{}] provided", wsHandler); throw new RuntimeException("TbWebSocketHandler expected but " + wsHandler + " provided"); } - registry.addHandler(wsHandler, WS_PLUGIN_MAPPING).setAllowedOriginPatterns("*") + registry.addHandler(wsHandler, WS_API_MAPPING).setAllowedOriginPatterns("*") .addInterceptors(new HttpSessionHandshakeInterceptor(), new HandshakeInterceptor() { @Override diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index e1f20be0e4..de073aca9a 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -16,6 +16,7 @@ package org.thingsboard.server.controller.plugin; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.BeanCreationNotAllowedException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -52,7 +53,6 @@ import javax.websocket.SendHandler; import javax.websocket.SendResult; import javax.websocket.Session; import java.io.IOException; -import java.net.URI; import java.security.InvalidParameterException; import java.util.Optional; import java.util.Queue; @@ -199,17 +199,16 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke } } - private WebSocketSessionRef toRef(WebSocketSession session) throws IOException { - URI sessionUri = session.getUri(); - String path = sessionUri.getPath(); - path = path.substring(WebSocketConfiguration.WS_PLUGIN_PREFIX.length()); - if (path.length() == 0) { - throw new IllegalArgumentException("URL should contain plugin token!"); + private WebSocketSessionRef toRef(WebSocketSession session) { + String path = session.getUri().getPath(); + WebSocketSessionType sessionType; + if (path.equals(WebSocketConfiguration.WS_API_ENDPOINT)) { + sessionType = WebSocketSessionType.GENERAL; + } else { + String type = StringUtils.substringAfter(path, WebSocketConfiguration.WS_PLUGINS_ENDPOINT); + sessionType = WebSocketSessionType.forName(type) + .orElseThrow(() -> new InvalidParameterException("Unknown session type")); } - String[] pathElements = path.split("/"); - String serviceToken = pathElements[0]; - WebSocketSessionType sessionType = WebSocketSessionType.forName(serviceToken) - .orElseThrow(() -> new InvalidParameterException("Can't find plugin with specified token!")); SecurityUser currentUser = (SecurityUser) ((Authentication) session.getPrincipal()).getPrincipal(); return WebSocketSessionRef.builder() @@ -411,10 +410,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke } } if (!limitAllowed) { - log.info("[{}][{}][{}] Failed to start session. Max tenant sessions limit reached" - , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId); - session.close(CloseStatus.POLICY_VIOLATION.withReason("Max tenant sessions limit reached!")); - return false; + log.info("[{}][{}][{}] Failed to start session. Max tenant sessions limit reached" + , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId); + session.close(CloseStatus.POLICY_VIOLATION.withReason("Max tenant sessions limit reached!")); + return false; } } @@ -428,10 +427,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke } } if (!limitAllowed) { - log.info("[{}][{}][{}] Failed to start session. Max customer sessions limit reached" - , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId); - session.close(CloseStatus.POLICY_VIOLATION.withReason("Max customer sessions limit reached")); - return false; + log.info("[{}][{}][{}] Failed to start session. Max customer sessions limit reached" + , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId); + session.close(CloseStatus.POLICY_VIOLATION.withReason("Max customer sessions limit reached")); + return false; } } if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0 @@ -444,10 +443,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke } } if (!limitAllowed) { - log.info("[{}][{}][{}] Failed to start session. Max regular user sessions limit reached" - , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId); - session.close(CloseStatus.POLICY_VIOLATION.withReason("Max regular user sessions limit reached")); - return false; + log.info("[{}][{}][{}] Failed to start session. Max regular user sessions limit reached" + , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId); + session.close(CloseStatus.POLICY_VIOLATION.withReason("Max regular user sessions limit reached")); + return false; } } if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0 @@ -460,10 +459,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke } } if (!limitAllowed) { - log.info("[{}][{}][{}] Failed to start session. Max public user sessions limit reached" - , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId); - session.close(CloseStatus.POLICY_VIOLATION.withReason("Max public user sessions limit reached")); - return false; + log.info("[{}][{}][{}] Failed to start session. Max public user sessions limit reached" + , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId); + session.close(CloseStatus.POLICY_VIOLATION.withReason("Max public user sessions limit reached")); + return false; } } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 85f8bd7207..0d7dc7fbfe 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -21,8 +21,10 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.web.socket.CloseStatus; @@ -65,8 +67,7 @@ import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription; import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler; import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper; -import org.thingsboard.server.service.ws.notification.cmd.WsCmd; -import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper; +import org.thingsboard.server.service.ws.telemetry.cmd.TelemetryCmdsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.SubscriptionCmd; @@ -149,7 +150,7 @@ public class DefaultWebSocketService implements WebSocketService { private ScheduledExecutorService pingExecutor; private String serviceId; - private List> cmdsHandlers; + private List> cmdsHandlers; @PostConstruct public void init() { @@ -160,22 +161,22 @@ public class DefaultWebSocketService implements WebSocketService { pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS); cmdsHandlers = List.of( - newCmdsHandler(WsCommandsWrapper::getAttrSubCmds, this::handleWsAttributesSubscriptionCmd), - newCmdsHandler(WsCommandsWrapper::getTsSubCmds, this::handleWsTimeseriesSubscriptionCmd), - newCmdsHandler(WsCommandsWrapper::getHistoryCmds, this::handleWsHistoryCmd), - newCmdsHandler(WsCommandsWrapper::getEntityDataCmds, this::handleWsEntityDataCmd), - newCmdsHandler(WsCommandsWrapper::getAlarmDataCmds, this::handleWsAlarmDataCmd), - newCmdsHandler(WsCommandsWrapper::getEntityCountCmds, this::handleWsEntityCountCmd), - newCmdsHandler(WsCommandsWrapper::getAlarmCountCmds, this::handleWsAlarmCountCmd), - newCmdsHandler(WsCommandsWrapper::getEntityDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), - newCmdsHandler(WsCommandsWrapper::getAlarmDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), - newCmdsHandler(WsCommandsWrapper::getEntityCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), - newCmdsHandler(WsCommandsWrapper::getAlarmCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), - newCmdsHandler(WsCommandsWrapper::getUnreadNotificationsSubCmds, notificationCmdsHandler::handleUnreadNotificationsSubCmd), - newCmdsHandler(WsCommandsWrapper::getUnreadNotificationsCountSubCmds, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd), - newCmdsHandler(WsCommandsWrapper::getMarkNotificationAsReadCmds, notificationCmdsHandler::handleMarkAsReadCmd), - newCmdsHandler(WsCommandsWrapper::getMarkAllNotificationsAsReadCmds, notificationCmdsHandler::handleMarkAllAsReadCmd), - newCmdsHandler(WsCommandsWrapper::getNotificationsUnsubCmds, notificationCmdsHandler::handleUnsubCmd) + newCmdHandler(WsCmdType.ATTRIBUTES, this::handleWsAttributesSubscriptionCmd), + newCmdHandler(WsCmdType.TIMESERIES, this::handleWsTimeseriesSubscriptionCmd), + newCmdHandler(WsCmdType.TIMESERIES_HISTORY, this::handleWsHistoryCmd), + newCmdHandler(WsCmdType.ENTITY_DATA, this::handleWsEntityDataCmd), + newCmdHandler(WsCmdType.ALARM_DATA, this::handleWsAlarmDataCmd), + newCmdHandler(WsCmdType.ENTITY_COUNT, this::handleWsEntityCountCmd), + newCmdHandler(WsCmdType.ALARM_COUNT, this::handleWsAlarmCountCmd), + newCmdHandler(WsCmdType.ENTITY_DATA_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd), + newCmdHandler(WsCmdType.ALARM_DATA_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd), + newCmdHandler(WsCmdType.ENTITY_COUNT_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd), + newCmdHandler(WsCmdType.ALARM_COUNT_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd), + newCmdHandler(WsCmdType.NOTIFICATIONS, notificationCmdsHandler::handleUnreadNotificationsSubCmd), + newCmdHandler(WsCmdType.NOTIFICATIONS_COUNT, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd), + newCmdHandler(WsCmdType.MARK_NOTIFICATIONS_AS_READ, notificationCmdsHandler::handleMarkAsReadCmd), + newCmdHandler(WsCmdType.MARK_ALL_NOTIFICATIONS_AS_READ, notificationCmdsHandler::handleMarkAllAsReadCmd), + newCmdHandler(WsCmdType.NOTIFICATIONS_UNSUBSCRIBE, notificationCmdsHandler::handleUnsubCmd) ); } @@ -217,93 +218,71 @@ public class DefaultWebSocketService implements WebSocketService { } try { + WsCommandsWrapper cmdsWrapper; switch (sessionRef.getSessionType()) { case GENERAL: - processCmds(sessionRef, msg); + cmdsWrapper = JacksonUtil.fromString(msg, WsCommandsWrapper.class); + break; + case TELEMETRY: + cmdsWrapper = JacksonUtil.fromString(msg, TelemetryCmdsWrapper.class).toCommonCmdsWrapper(); break; case NOTIFICATIONS: - processNotificationCmds(sessionRef, msg); + cmdsWrapper = JacksonUtil.fromString(msg, NotificationCmdsWrapper.class).toCommonCmdsWrapper(); break; + default: + throw new IllegalArgumentException("Unknown session type"); } - } catch (IOException e) { + processCmds(sessionRef, cmdsWrapper); + } catch (Exception e) { log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e); sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(UNKNOWN_SUBSCRIPTION_ID, SubscriptionErrorCode.BAD_REQUEST, FAILED_TO_PARSE_WS_COMMAND)); } } - private void processCmds(WebSocketSessionRef sessionRef, String msg) throws JsonProcessingException { - WsCommandsWrapper cmdsWrapper = JacksonUtil.fromString(msg, WsCommandsWrapper.class); - processCmds(sessionRef, cmdsWrapper); - } - - private void processNotificationCmds(WebSocketSessionRef sessionRef, String msg) throws IOException { - NotificationCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, NotificationCmdsWrapper.class); - processCmds(sessionRef, cmdsWrapper.toCommonCmdsWrapper()); - } - private void processCmds(WebSocketSessionRef sessionRef, WsCommandsWrapper cmdsWrapper) { - if (cmdsWrapper == null) { + if (cmdsWrapper == null || CollectionUtils.isEmpty(cmdsWrapper.getCmds())) { return; } String sessionId = sessionRef.getSessionId(); - for (WsCmdsHandler cmdHandler : cmdsHandlers) { - List cmds = cmdHandler.extract(cmdsWrapper); - if (cmds == null) { - continue; - } - for (WsCmd cmd : cmds) { - if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId)) { - try { - cmdHandler.handle(sessionRef, cmd); - } catch (Exception e) { - log.error("[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}", sessionId, - sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), cmd, e); - } - } else { - return; - } + if (!validateSessionMetadata(sessionRef, cmdsWrapper.getCmds().get(0).getCmdId(), sessionId)) { + return; + } + + for (WsCmd cmd : cmdsWrapper.getCmds()) { + log.debug("[{}][{}][{}] Processing cmd: {}", sessionId, cmd.getType(), cmd.getCmdId(), cmd); + try { + getCmdHandler(cmd.getType()).handle(sessionRef, cmd); + } catch (Exception e) { + log.error("[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}", sessionId, + sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), cmd, e); } } } private void handleWsEntityDataCmd(WebSocketSessionRef sessionRef, EntityDataCmd cmd) { - String sessionId = sessionRef.getSessionId(); - log.debug("[{}] Processing: {}", sessionId, cmd); - if (validateSubscriptionCmd(sessionRef, cmd)) { entityDataSubService.handleCmd(sessionRef, cmd); } } private void handleWsEntityCountCmd(WebSocketSessionRef sessionRef, EntityCountCmd cmd) { - String sessionId = sessionRef.getSessionId(); - log.debug("[{}] Processing: {}", sessionId, cmd); - if (validateSubscriptionCmd(sessionRef, cmd)) { entityDataSubService.handleCmd(sessionRef, cmd); } } private void handleWsAlarmDataCmd(WebSocketSessionRef sessionRef, AlarmDataCmd cmd) { - String sessionId = sessionRef.getSessionId(); - log.debug("[{}] Processing: {}", sessionId, cmd); - if (validateSubscriptionCmd(sessionRef, cmd)) { entityDataSubService.handleCmd(sessionRef, cmd); } } private void handleWsDataUnsubscribeCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) { - String sessionId = sessionRef.getSessionId(); - log.debug("[{}] Processing: {}", sessionId, cmd); entityDataSubService.cancelSubscription(sessionRef.getSessionId(), cmd); } private void handleWsAlarmCountCmd(WebSocketSessionRef sessionRef, AlarmCountCmd cmd) { - String sessionId = sessionRef.getSessionId(); - log.debug("[{}] Processing: {}", sessionId, cmd); - - if (validateSubscriptionCmd(sessionRef, cmd)) { + if (validateCmd(sessionRef, cmd)) { entityDataSubService.handleCmd(sessionRef, cmd); } } @@ -449,8 +428,6 @@ public class DefaultWebSocketService implements WebSocketService { } String sessionId = sessionRef.getSessionId(); - log.debug("[{}] Processing: {}", sessionId, cmd); - if (cmd.isUnsubscribe()) { unsubscribe(sessionRef, cmd, sessionId); } else if (validateSubscriptionCmd(sessionRef, cmd)) { @@ -535,18 +512,15 @@ public class DefaultWebSocketService implements WebSocketService { } private void handleWsHistoryCmd(WebSocketSessionRef sessionRef, GetHistoryCmd cmd) { - if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) { - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, - "Device id is empty!"); - sendWsMsg(sessionRef, update); - return; - } - if (cmd.getKeys() == null || cmd.getKeys().isEmpty()) { - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, - "Keys are empty!"); - sendWsMsg(sessionRef, update); - return; - } + if (!validateCmd(sessionRef, cmd, () -> { + if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) { + throw new IllegalArgumentException("Device id is empty!"); + } + if (cmd.getKeys() == null || cmd.getKeys().isEmpty()) { + throw new IllegalArgumentException("Keys are empty!"); + } + })) return; + EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); List queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))) @@ -623,9 +597,7 @@ public class DefaultWebSocketService implements WebSocketService { @Override public void onFailure(Throwable e) { log.error(FAILED_TO_FETCH_ATTRIBUTES, e); - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, - FAILED_TO_FETCH_ATTRIBUTES); - sendWsMsg(sessionRef, update); + sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, FAILED_TO_FETCH_ATTRIBUTES); } }; @@ -643,8 +615,6 @@ public class DefaultWebSocketService implements WebSocketService { } String sessionId = sessionRef.getSessionId(); - log.debug("[{}] Processing: {}", sessionId, cmd); - if (cmd.isUnsubscribe()) { unsubscribe(sessionRef, cmd, sessionId); } else if (validateSubscriptionCmd(sessionRef, cmd)) { @@ -783,9 +753,7 @@ public class DefaultWebSocketService implements WebSocketService { } else { log.info(FAILED_TO_FETCH_DATA, e); } - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, - FAILED_TO_FETCH_DATA); - sendWsMsg(sessionRef, update); + sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, FAILED_TO_FETCH_DATA); } }; } @@ -799,82 +767,73 @@ public class DefaultWebSocketService implements WebSocketService { } private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, EntityDataCmd cmd) { - if (cmd.getCmdId() < 0) { - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, - "Cmd id is negative value!"); - sendWsMsg(sessionRef, update); - return false; - } else if (cmd.getQuery() == null && !cmd.hasAnyCmd()) { - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, - "Query is empty!"); - sendWsMsg(sessionRef, update); - return false; - } - return true; + return validateCmd(sessionRef, cmd, () -> { + if (cmd.getQuery() == null && !cmd.hasAnyCmd()) { + throw new IllegalArgumentException("Query is empty!"); + } + }); } private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, EntityCountCmd cmd) { - if (cmd.getCmdId() < 0) { - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, - "Cmd id is negative value!"); - sendWsMsg(sessionRef, update); - return false; - } else if (cmd.getQuery() == null) { - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, "Query is empty!"); - sendWsMsg(sessionRef, update); - return false; - } - return true; + return validateCmd(sessionRef, cmd, () -> { + if (cmd.getQuery() == null) { + throw new IllegalArgumentException("Query is empty!"); + } + }); } private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, AlarmDataCmd cmd) { - if (cmd.getCmdId() < 0) { - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, - "Cmd id is negative value!"); - sendWsMsg(sessionRef, update); - return false; - } else if (cmd.getQuery() == null) { - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, - "Query is empty!"); - sendWsMsg(sessionRef, update); - return false; - } - return true; + return validateCmd(sessionRef, cmd, () -> { + if (cmd.getQuery() == null) { + throw new IllegalArgumentException("Query is empty!"); + } + }); } private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, SubscriptionCmd cmd) { - if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) { - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, - "Device id is empty!"); - sendWsMsg(sessionRef, update); - return false; - } - return true; + return validateCmd(sessionRef, cmd, () -> { + if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) { + throw new IllegalArgumentException("Device id is empty!"); + } + }); } private boolean validateSessionMetadata(WebSocketSessionRef sessionRef, int cmdId, String sessionId) { WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId); if (sessionMD == null) { log.warn("[{}] Session meta data not found. ", sessionId); - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmdId, SubscriptionErrorCode.INTERNAL_ERROR, - SESSION_META_DATA_NOT_FOUND); - sendWsMsg(sessionRef, update); + sendError(sessionRef, cmdId, SubscriptionErrorCode.INTERNAL_ERROR, SESSION_META_DATA_NOT_FOUND); return false; } else { return true; } } - private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, AlarmCountCmd cmd) { + private boolean validateCmd(WebSocketSessionRef sessionRef, WsCmd cmd) { + return validateCmd(sessionRef, cmd, null); + } + + private boolean validateCmd(WebSocketSessionRef sessionRef, C cmd, Runnable validator) { if (cmd.getCmdId() < 0) { - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, - "Cmd id is negative value!"); - sendWsMsg(sessionRef, update); + sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, "Cmd id is negative value!"); + return false; + } + try { + if (validator != null) { + validator.run(); + } + } catch (Exception e) { + sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, e.getMessage()); return false; } return true; } + private void sendError(WebSocketSessionRef sessionRef, int subId, SubscriptionErrorCode errorCode, String errorMsg) { + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(subId, errorCode, errorMsg); + sendWsMsg(sessionRef, update); + } + private void sendWsMsg(WebSocketSessionRef sessionRef, EntityDataUpdate update) { sendWsMsg(sessionRef, update.getCmdId(), update); } @@ -1032,34 +991,29 @@ public class DefaultWebSocketService implements WebSocketService { .map(TenantProfile::getDefaultProfileConfiguration).orElse(null); } + public WsCmdHandler getCmdHandler(WsCmdType cmdType) { + for (WsCmdHandler cmdHandler : cmdsHandlers) { + if (cmdHandler.getCmdType() == cmdType) { + return cmdHandler; + } + } + throw new IllegalArgumentException("Unknown command type " + cmdType); + } - public static WsCmdsHandler newCmdsHandler(java.util.function.Function> cmdsExtractor, - BiConsumer handler) { - return new WsCmdsHandler<>(cmdsExtractor, handler); + public static WsCmdHandler newCmdHandler(WsCmdType cmdType, BiConsumer handler) { + return new WsCmdHandler<>(cmdType, handler); } @RequiredArgsConstructor - public static class WsCmdsHandler { - private final java.util.function.Function> cmdsExtractor; + @Getter + @SuppressWarnings("unchecked") + public static class WsCmdHandler { + private final WsCmdType cmdType; protected final BiConsumer handler; - public List extract(WsCommandsWrapper cmdsWrapper) { - return cmdsExtractor.apply(cmdsWrapper); - } - - @SuppressWarnings("unchecked") - public void handle(WebSocketSessionRef sessionRef, Object cmd) { + public void handle(WebSocketSessionRef sessionRef, WsCmd cmd) { handler.accept(sessionRef, (C) cmd); } } - public static class WsCmdHandler extends WsCmdsHandler { - public WsCmdHandler(java.util.function.Function cmdExtractor, BiConsumer handler) { - super(cmdsWrapper -> { - C cmd = cmdExtractor.apply(cmdsWrapper); - return cmd != null ? List.of(cmd) : null; - }, handler); - } - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionType.java b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionType.java index 5547ef3359..af2206686f 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionType.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionType.java @@ -15,23 +15,25 @@ */ package org.thingsboard.server.service.ws; -import lombok.Getter; -import lombok.RequiredArgsConstructor; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; import java.util.Arrays; import java.util.Optional; -@RequiredArgsConstructor -@Getter +@NoArgsConstructor +@AllArgsConstructor public enum WebSocketSessionType { - GENERAL("telemetry"), + GENERAL(), + TELEMETRY("telemetry"), // deprecated NOTIFICATIONS("notifications"); // deprecated - private final String name; + private String name; public static Optional forName(String name) { return Arrays.stream(values()) - .filter(sessionType -> sessionType.getName().equals(name)) + .filter(sessionType -> StringUtils.equals(sessionType.name, name)) .findFirst(); } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/WsCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/WsCmd.java similarity index 82% rename from application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/WsCmd.java rename to application/src/main/java/org/thingsboard/server/service/ws/WsCmd.java index 97bfeab70c..0f4678cf76 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/WsCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/WsCmd.java @@ -13,8 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.ws.notification.cmd; +package org.thingsboard.server.service.ws; + +import com.fasterxml.jackson.annotation.JsonIgnore; + public interface WsCmd { + int getCmdId(); + + @JsonIgnore + WsCmdType getType(); + } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/WsCmdType.java b/application/src/main/java/org/thingsboard/server/service/ws/WsCmdType.java new file mode 100644 index 0000000000..ef1d8520a2 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ws/WsCmdType.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ws; + +public enum WsCmdType { + ATTRIBUTES, + TIMESERIES, + TIMESERIES_HISTORY, + ENTITY_DATA, + ENTITY_COUNT, + ALARM_DATA, + ALARM_COUNT, + + NOTIFICATIONS, + NOTIFICATIONS_COUNT, + MARK_NOTIFICATIONS_AS_READ, + MARK_ALL_NOTIFICATIONS_AS_READ, + + ALARM_DATA_UNSUBSCRIBE, + ALARM_COUNT_UNSUBSCRIBE, + ENTITY_DATA_UNSUBSCRIBE, + ENTITY_COUNT_UNSUBSCRIBE, + NOTIFICATIONS_UNSUBSCRIBE +} diff --git a/application/src/main/java/org/thingsboard/server/service/ws/WsCommandsWrapper.java b/application/src/main/java/org/thingsboard/server/service/ws/WsCommandsWrapper.java new file mode 100644 index 0000000000..cbdbdf8c05 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ws/WsCommandsWrapper.java @@ -0,0 +1,69 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ws; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.thingsboard.server.service.ws.notification.cmd.MarkAllNotificationsAsReadCmd; +import org.thingsboard.server.service.ws.notification.cmd.MarkNotificationsAsReadCmd; +import org.thingsboard.server.service.ws.notification.cmd.NotificationsCountSubCmd; +import org.thingsboard.server.service.ws.notification.cmd.NotificationsSubCmd; +import org.thingsboard.server.service.ws.notification.cmd.NotificationsUnsubCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v1.TimeseriesSubscriptionCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUnsubscribeCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataUnsubscribeCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUnsubscribeCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUnsubscribeCmd; + +import java.util.List; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class WsCommandsWrapper { + + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") + @JsonSubTypes({ + @Type(name = "ATTRIBUTES", value = AttributesSubscriptionCmd.class), + @Type(name = "TIMESERIES", value = TimeseriesSubscriptionCmd.class), + @Type(name = "TIMESERIES_HISTORY", value = GetHistoryCmd.class), + @Type(name = "ENTITY_DATA", value = EntityDataCmd.class), + @Type(name = "ENTITY_COUNT", value = EntityCountCmd.class), + @Type(name = "ALARM_DATA", value = AlarmDataCmd.class), + @Type(name = "ALARM_COUNT", value = AlarmCountCmd.class), + @Type(name = "NOTIFICATIONS", value = NotificationsSubCmd.class), + @Type(name = "NOTIFICATIONS_COUNT", value = NotificationsCountSubCmd.class), + @Type(name = "MARK_NOTIFICATIONS_AS_READ", value = MarkNotificationsAsReadCmd.class), + @Type(name = "MARK_ALL_NOTIFICATIONS_AS_READ", value = MarkAllNotificationsAsReadCmd.class), + @Type(name = "ALARM_DATA_UNSUBSCRIBE", value = AlarmDataUnsubscribeCmd.class), + @Type(name = "ALARM_COUNT_UNSUBSCRIBE", value = AlarmCountUnsubscribeCmd.class), + @Type(name = "ENTITY_DATA_UNSUBSCRIBE", value = EntityDataUnsubscribeCmd.class), + @Type(name = "ENTITY_COUNT_UNSUBSCRIBE", value = EntityCountUnsubscribeCmd.class), + @Type(name = "NOTIFICATIONS_UNSUBSCRIBE", value = NotificationsUnsubCmd.class), + }) + private List cmds; + +} diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/MarkAllNotificationsAsReadCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/MarkAllNotificationsAsReadCmd.java index f096106135..d11992ba4f 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/MarkAllNotificationsAsReadCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/MarkAllNotificationsAsReadCmd.java @@ -18,10 +18,17 @@ package org.thingsboard.server.service.ws.notification.cmd; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import org.thingsboard.server.service.ws.WsCmd; +import org.thingsboard.server.service.ws.WsCmdType; @Data @NoArgsConstructor @AllArgsConstructor public class MarkAllNotificationsAsReadCmd implements WsCmd { private int cmdId; + + @Override + public WsCmdType getType() { + return WsCmdType.MARK_ALL_NOTIFICATIONS_AS_READ; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/MarkNotificationsAsReadCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/MarkNotificationsAsReadCmd.java index 55de75387f..2452163fd0 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/MarkNotificationsAsReadCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/MarkNotificationsAsReadCmd.java @@ -18,6 +18,8 @@ package org.thingsboard.server.service.ws.notification.cmd; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import org.thingsboard.server.service.ws.WsCmd; +import org.thingsboard.server.service.ws.WsCmdType; import java.util.List; import java.util.UUID; @@ -28,4 +30,9 @@ import java.util.UUID; public class MarkNotificationsAsReadCmd implements WsCmd { private int cmdId; private List notifications; + + @Override + public WsCmdType getType() { + return WsCmdType.MARK_NOTIFICATIONS_AS_READ; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java index 5161ce8072..3e0eca1fab 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationCmdsWrapper.java @@ -17,9 +17,11 @@ package org.thingsboard.server.service.ws.notification.cmd; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; -import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper; +import org.thingsboard.server.service.ws.WsCommandsWrapper; -import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * @deprecated Use {@link WsCommandsWrapper}. This class is left for backward compatibility @@ -40,17 +42,11 @@ public class NotificationCmdsWrapper { @JsonIgnore public WsCommandsWrapper toCommonCmdsWrapper() { - WsCommandsWrapper wrapper = new WsCommandsWrapper(); - wrapper.setUnreadNotificationsCountSubCmds(toList(unreadCountSubCmd)); - wrapper.setUnreadNotificationsSubCmds(toList(unreadSubCmd)); - wrapper.setMarkNotificationAsReadCmds(toList(markAsReadCmd)); - wrapper.setMarkAllNotificationsAsReadCmds(toList(markAllAsReadCmd)); - wrapper.setNotificationsUnsubCmds(toList(unsubCmd)); - return wrapper; - } - - private List toList(C cmd) { - return cmd != null ? List.of(cmd) : null; + return new WsCommandsWrapper(Stream.of( + unreadCountSubCmd, unreadSubCmd, markAsReadCmd, markAllAsReadCmd, unsubCmd + ) + .filter(Objects::nonNull) + .collect(Collectors.toList())); } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationsCountSubCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationsCountSubCmd.java index 0d6757a6e7..7fa0ac9a6a 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationsCountSubCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationsCountSubCmd.java @@ -18,10 +18,17 @@ package org.thingsboard.server.service.ws.notification.cmd; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import org.thingsboard.server.service.ws.WsCmd; +import org.thingsboard.server.service.ws.WsCmdType; @Data @NoArgsConstructor @AllArgsConstructor public class NotificationsCountSubCmd implements WsCmd { private int cmdId; + + @Override + public WsCmdType getType() { + return WsCmdType.NOTIFICATIONS_COUNT; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationsSubCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationsSubCmd.java index e022d2c49f..44d6c47ff6 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationsSubCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationsSubCmd.java @@ -18,6 +18,8 @@ package org.thingsboard.server.service.ws.notification.cmd; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import org.thingsboard.server.service.ws.WsCmd; +import org.thingsboard.server.service.ws.WsCmdType; @Data @NoArgsConstructor @@ -25,4 +27,9 @@ import lombok.NoArgsConstructor; public class NotificationsSubCmd implements WsCmd { private int cmdId; private int limit; + + @Override + public WsCmdType getType() { + return WsCmdType.NOTIFICATIONS; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationsUnsubCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationsUnsubCmd.java index c9f0897eeb..a63351d9b9 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationsUnsubCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/cmd/NotificationsUnsubCmd.java @@ -18,6 +18,8 @@ package org.thingsboard.server.service.ws.notification.cmd; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import org.thingsboard.server.service.ws.WsCmd; +import org.thingsboard.server.service.ws.WsCmdType; import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd; @Data @@ -25,4 +27,9 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd; @AllArgsConstructor public class NotificationsUnsubCmd implements UnsubscribeCmd, WsCmd { private int cmdId; + + @Override + public WsCmdType getType() { + return WsCmdType.NOTIFICATIONS_UNSUBSCRIBE; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/WsCommandsWrapper.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/TelemetryCmdsWrapper.java similarity index 70% rename from application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/WsCommandsWrapper.java rename to application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/TelemetryCmdsWrapper.java index 34d2acfbb9..e2029cbe36 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/WsCommandsWrapper.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/TelemetryCmdsWrapper.java @@ -15,12 +15,9 @@ */ package org.thingsboard.server.service.ws.telemetry.cmd; +import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; -import org.thingsboard.server.service.ws.notification.cmd.MarkAllNotificationsAsReadCmd; -import org.thingsboard.server.service.ws.notification.cmd.MarkNotificationsAsReadCmd; -import org.thingsboard.server.service.ws.notification.cmd.NotificationsCountSubCmd; -import org.thingsboard.server.service.ws.notification.cmd.NotificationsSubCmd; -import org.thingsboard.server.service.ws.notification.cmd.NotificationsUnsubCmd; +import org.thingsboard.server.service.ws.WsCommandsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.TimeseriesSubscriptionCmd; @@ -33,13 +30,18 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUnsubscribe import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUnsubscribeCmd; +import java.util.Collection; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** - * @author Andrew Shvayka - */ + * @deprecated Use {@link WsCommandsWrapper}. This class is left for backward compatibility + * */ @Data -public class WsCommandsWrapper { +@Deprecated +public class TelemetryCmdsWrapper { private List attrSubCmds; @@ -63,14 +65,17 @@ public class WsCommandsWrapper { private List alarmCountUnsubscribeCmds; - private List unreadNotificationsCountSubCmds; - - private List unreadNotificationsSubCmds; - - private List markNotificationAsReadCmds; - - private List markAllNotificationsAsReadCmds; - - private List notificationsUnsubCmds; + @JsonIgnore + public WsCommandsWrapper toCommonCmdsWrapper() { + return new WsCommandsWrapper(Stream.of( + attrSubCmds, tsSubCmds, historyCmds, entityDataCmds, + entityDataUnsubscribeCmds, alarmDataCmds, alarmDataUnsubscribeCmds, + entityCountCmds, entityCountUnsubscribeCmds, + alarmCountCmds, alarmCountUnsubscribeCmds + ) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .collect(Collectors.toList())); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/AttributesSubscriptionCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/AttributesSubscriptionCmd.java index 1ccec261b0..d1153b8bdd 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/AttributesSubscriptionCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/AttributesSubscriptionCmd.java @@ -16,7 +16,7 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v1; import lombok.NoArgsConstructor; -import org.thingsboard.server.service.ws.telemetry.TelemetryFeature; +import org.thingsboard.server.service.ws.WsCmdType; /** * @author Andrew Shvayka @@ -25,8 +25,8 @@ import org.thingsboard.server.service.ws.telemetry.TelemetryFeature; public class AttributesSubscriptionCmd extends SubscriptionCmd { @Override - public TelemetryFeature getType() { - return TelemetryFeature.ATTRIBUTES; + public WsCmdType getType() { + return WsCmdType.ATTRIBUTES; } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/GetHistoryCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/GetHistoryCmd.java index 6791066641..0da067f6da 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/GetHistoryCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/GetHistoryCmd.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v1; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import org.thingsboard.server.service.ws.WsCmdType; /** * @author Andrew Shvayka @@ -37,4 +38,8 @@ public class GetHistoryCmd implements TelemetryPluginCmd { private int limit; private String agg; + @Override + public WsCmdType getType() { + return WsCmdType.TIMESERIES_HISTORY; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/SubscriptionCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/SubscriptionCmd.java index 6a5f9d820d..3b742c3a2f 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/SubscriptionCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/SubscriptionCmd.java @@ -32,8 +32,6 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd { private String scope; private boolean unsubscribe; - public abstract TelemetryFeature getType(); - @Override public String toString() { return "SubscriptionCmd [entityType=" + entityType + ", entityId=" + entityId + ", tags=" + keys + ", unsubscribe=" + unsubscribe + "]"; diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/TelemetryPluginCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/TelemetryPluginCmd.java index 5081e71d0a..b57b907184 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/TelemetryPluginCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/TelemetryPluginCmd.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.service.ws.telemetry.cmd.v1; -import org.thingsboard.server.service.ws.notification.cmd.WsCmd; +import org.thingsboard.server.service.ws.WsCmd; /** * @author Andrew Shvayka diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/TimeseriesSubscriptionCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/TimeseriesSubscriptionCmd.java index f0a7b9d3af..aea228eebc 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/TimeseriesSubscriptionCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v1/TimeseriesSubscriptionCmd.java @@ -17,8 +17,9 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v1; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; -import org.thingsboard.server.service.ws.telemetry.TelemetryFeature; +import org.thingsboard.server.service.ws.WsCmdType; /** * @author Andrew Shvayka @@ -26,6 +27,7 @@ import org.thingsboard.server.service.ws.telemetry.TelemetryFeature; @NoArgsConstructor @AllArgsConstructor @Data +@EqualsAndHashCode(callSuper = true) public class TimeseriesSubscriptionCmd extends SubscriptionCmd { private long startTs; @@ -35,7 +37,7 @@ public class TimeseriesSubscriptionCmd extends SubscriptionCmd { private String agg; @Override - public TelemetryFeature getType() { - return TelemetryFeature.TIMESERIES; + public WsCmdType getType() { + return WsCmdType.TIMESERIES; } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmCountCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmCountCmd.java index 80fd998eb1..b1fa9cf785 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmCountCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmCountCmd.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Getter; import org.thingsboard.server.common.data.query.AlarmCountQuery; +import org.thingsboard.server.service.ws.WsCmdType; public class AlarmCountCmd extends DataCmd { @@ -31,4 +32,9 @@ public class AlarmCountCmd extends DataCmd { super(cmdId); this.query = query; } + + @Override + public WsCmdType getType() { + return WsCmdType.ALARM_COUNT; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmCountUnsubscribeCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmCountUnsubscribeCmd.java index 79c860f3f6..02cf5d5572 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmCountUnsubscribeCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmCountUnsubscribeCmd.java @@ -16,10 +16,15 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v2; import lombok.Data; +import org.thingsboard.server.service.ws.WsCmdType; @Data public class AlarmCountUnsubscribeCmd implements UnsubscribeCmd { private final int cmdId; + @Override + public WsCmdType getType() { + return WsCmdType.ALARM_COUNT_UNSUBSCRIBE; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmDataCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmDataCmd.java index 0f83d00d6d..0efa6ae608 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmDataCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmDataCmd.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Getter; import org.thingsboard.server.common.data.query.AlarmDataQuery; +import org.thingsboard.server.service.ws.WsCmdType; public class AlarmDataCmd extends DataCmd { @@ -30,4 +31,9 @@ public class AlarmDataCmd extends DataCmd { super(cmdId); this.query = query; } + + @Override + public WsCmdType getType() { + return WsCmdType.ALARM_DATA; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmDataUnsubscribeCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmDataUnsubscribeCmd.java index 97db558cf9..7dfb1159c5 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmDataUnsubscribeCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/AlarmDataUnsubscribeCmd.java @@ -16,10 +16,15 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v2; import lombok.Data; +import org.thingsboard.server.service.ws.WsCmdType; @Data public class AlarmDataUnsubscribeCmd implements UnsubscribeCmd { private final int cmdId; + @Override + public WsCmdType getType() { + return WsCmdType.ALARM_DATA_UNSUBSCRIBE; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/DataCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/DataCmd.java index 282e95f511..74133230cb 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/DataCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/DataCmd.java @@ -17,10 +17,10 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v2; import lombok.Data; import lombok.Getter; -import org.thingsboard.server.service.ws.notification.cmd.WsCmd; +import org.thingsboard.server.service.ws.WsCmd; @Data -public class DataCmd implements WsCmd { +public abstract class DataCmd implements WsCmd { @Getter private final int cmdId; diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityCountCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityCountCmd.java index c39f81f1b7..3431afaf1a 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityCountCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityCountCmd.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Getter; import org.thingsboard.server.common.data.query.EntityCountQuery; +import org.thingsboard.server.service.ws.WsCmdType; public class EntityCountCmd extends DataCmd { @@ -31,4 +32,9 @@ public class EntityCountCmd extends DataCmd { super(cmdId); this.query = query; } + + @Override + public WsCmdType getType() { + return WsCmdType.ENTITY_COUNT; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityCountUnsubscribeCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityCountUnsubscribeCmd.java index b82fecd8fd..4e0a50c7e1 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityCountUnsubscribeCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityCountUnsubscribeCmd.java @@ -16,10 +16,15 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v2; import lombok.Data; +import org.thingsboard.server.service.ws.WsCmdType; @Data public class EntityCountUnsubscribeCmd implements UnsubscribeCmd { private final int cmdId; + @Override + public WsCmdType getType() { + return WsCmdType.ENTITY_COUNT_UNSUBSCRIBE; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityDataCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityDataCmd.java index 5fe9179651..ace96f6673 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityDataCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityDataCmd.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Getter; import org.thingsboard.server.common.data.query.EntityDataQuery; +import org.thingsboard.server.service.ws.WsCmdType; public class EntityDataCmd extends DataCmd { @@ -62,4 +63,8 @@ public class EntityDataCmd extends DataCmd { return historyCmd != null || latestCmd != null || tsCmd != null || aggHistoryCmd != null || aggTsCmd != null; } + @Override + public WsCmdType getType() { + return WsCmdType.ENTITY_DATA; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityDataUnsubscribeCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityDataUnsubscribeCmd.java index 80577edda2..56727faf54 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityDataUnsubscribeCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/EntityDataUnsubscribeCmd.java @@ -16,10 +16,15 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v2; import lombok.Data; +import org.thingsboard.server.service.ws.WsCmdType; @Data public class EntityDataUnsubscribeCmd implements UnsubscribeCmd { private final int cmdId; + @Override + public WsCmdType getType() { + return WsCmdType.ENTITY_DATA_UNSUBSCRIBE; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/UnsubscribeCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/UnsubscribeCmd.java index 3bdc70b503..4efdb279be 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/UnsubscribeCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/UnsubscribeCmd.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.service.ws.telemetry.cmd.v2; -import org.thingsboard.server.service.ws.notification.cmd.WsCmd; +import org.thingsboard.server.service.ws.WsCmd; public interface UnsubscribeCmd extends WsCmd { diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java index 5809ab5e42..fe09a187e1 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java @@ -52,8 +52,8 @@ public abstract class AbstractControllerTest extends AbstractNotifyEntityTest { @LocalServerPort protected int wsPort; - private volatile TbTestWebSocketClient wsClient; // lazy - private volatile TbTestWebSocketClient anotherWsClient; // lazy + protected volatile TbTestWebSocketClient wsClient; // lazy + protected volatile TbTestWebSocketClient anotherWsClient; // lazy public TbTestWebSocketClient getWsClient() { if (wsClient == null) { @@ -101,7 +101,11 @@ public abstract class AbstractControllerTest extends AbstractNotifyEntityTest { } protected TbTestWebSocketClient buildAndConnectWebSocketClient() throws URISyntaxException, InterruptedException { - TbTestWebSocketClient wsClient = new TbTestWebSocketClient(new URI(WS_URL + wsPort + "/api/ws/plugins/telemetry?token=" + token)); + return buildAndConnectWebSocketClient("/api/ws"); + } + + protected TbTestWebSocketClient buildAndConnectWebSocketClient(String path) throws URISyntaxException, InterruptedException { + TbTestWebSocketClient wsClient = new TbTestWebSocketClient(new URI(WS_URL + wsPort + path + "?token=" + token)); assertThat(wsClient.connectBlocking(TIMEOUT, TimeUnit.SECONDS)).isTrue(); return wsClient; } diff --git a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java index a365a36cf4..0194911e5b 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java +++ b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java @@ -16,7 +16,6 @@ package org.thingsboard.server.controller; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; @@ -28,11 +27,10 @@ import org.thingsboard.server.common.data.query.EntityDataPageLink; import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.data.query.EntityFilter; import org.thingsboard.server.common.data.query.EntityKey; -import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper; +import org.thingsboard.server.service.ws.WsCmd; +import org.thingsboard.server.service.ws.WsCommandsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; -import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate; -import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; @@ -105,24 +103,6 @@ public class TbTestWebSocketClient extends WebSocketClient { super.send(text); } - public void send(EntityDataCmd cmd) throws NotYetConnectedException { - WsCommandsWrapper wrapper = new WsCommandsWrapper(); - wrapper.setEntityDataCmds(Collections.singletonList(cmd)); - this.send(JacksonUtil.toString(wrapper)); - } - - public void send(EntityCountCmd cmd) throws NotYetConnectedException { - WsCommandsWrapper wrapper = new WsCommandsWrapper(); - wrapper.setEntityCountCmds(Collections.singletonList(cmd)); - this.send(JacksonUtil.toString(wrapper)); - } - - public void send(AlarmCountCmd cmd) throws NotYetConnectedException { - WsCommandsWrapper wrapper = new WsCommandsWrapper(); - wrapper.setAlarmCountCmds(Collections.singletonList(cmd)); - this.send(JacksonUtil.toString(wrapper)); - } - public String waitForUpdate() { return waitForUpdate(false); } @@ -240,11 +220,7 @@ public class TbTestWebSocketClient extends WebSocketClient { cmd.setEntityId(entityId.getId().toString()); cmd.setScope(scope); cmd.setKeys(String.join(",", keys)); - WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); - cmdsWrapper.setAttrSubCmds(List.of(cmd)); - JsonNode msg = JacksonUtil.valueToTree(cmdsWrapper); - ((ObjectNode) msg.get("attrSubCmds").get(0)).remove("type"); - send(msg.toString()); + send(cmd); return JacksonUtil.toJsonNode(waitForReply()); } @@ -288,4 +264,10 @@ public class TbTestWebSocketClient extends WebSocketClient { return sendEntityDataQuery(edq); } + public void send(WsCmd... cmds) { + WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); + cmdsWrapper.setCmds(List.of(cmds)); + send(JacksonUtil.toString(cmdsWrapper)); + } + } diff --git a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java index 73ee8f9252..070fda9474 100644 --- a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java @@ -648,7 +648,7 @@ public class WebsocketApiTest extends AbstractControllerTest { } @Test - public void testEntityCountCmd_filterTypeSingularCompatibilityTest() { + public void testEntityCountCmd_filterTypeSingularCompatibilityTest() throws Exception { ObjectNode oldFormatDeviceTypeFilterSingular = JacksonUtil.newObjectNode(); oldFormatDeviceTypeFilterSingular.put("type", "deviceType"); oldFormatDeviceTypeFilterSingular.put("deviceType", "default"); @@ -667,9 +667,10 @@ public class WebsocketApiTest extends AbstractControllerTest { ObjectNode wrapperNode = JacksonUtil.newObjectNode(); wrapperNode.set("entityCountCmds", entityCountCmds); - getWsClient().send(JacksonUtil.toString(wrapperNode)); + wsClient = buildAndConnectWebSocketClient("/api/ws/plugins/telemetry"); + wsClient.send(JacksonUtil.toString(wrapperNode)); - EntityCountUpdate update = getWsClient().parseCountReply(getWsClient().waitForReply()); + EntityCountUpdate update = wsClient.parseCountReply(wsClient.waitForReply()); Assert.assertEquals(1, update.getCmdId()); Assert.assertEquals(1, update.getCount()); diff --git a/application/src/test/java/org/thingsboard/server/service/notification/NotificationApiWsClient.java b/application/src/test/java/org/thingsboard/server/service/notification/NotificationApiWsClient.java index 1c4772ebf5..2cff2939c2 100644 --- a/application/src/test/java/org/thingsboard/server/service/notification/NotificationApiWsClient.java +++ b/application/src/test/java/org/thingsboard/server/service/notification/NotificationApiWsClient.java @@ -28,7 +28,6 @@ import org.thingsboard.server.service.ws.notification.cmd.NotificationsCountSubC import org.thingsboard.server.service.ws.notification.cmd.NotificationsSubCmd; import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsCountUpdate; import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsUpdate; -import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdateType; import java.net.URI; @@ -50,39 +49,26 @@ public class NotificationApiWsClient extends TbTestWebSocketClient { private List notifications; public NotificationApiWsClient(String wsUrl, String token) throws URISyntaxException { - super(new URI(wsUrl + "/api/ws/plugins/telemetry?token=" + token)); + super(new URI(wsUrl + "/api/ws?token=" + token)); } public NotificationApiWsClient subscribeForUnreadNotifications(int limit) { - WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); - cmdsWrapper.setUnreadNotificationsSubCmds(List.of(new NotificationsSubCmd(1, limit))); - sendCmd(cmdsWrapper); + send(new NotificationsSubCmd(1, limit)); this.limit = limit; return this; } public NotificationApiWsClient subscribeForUnreadNotificationsCount() { - WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); - cmdsWrapper.setUnreadNotificationsCountSubCmds(List.of(new NotificationsCountSubCmd(2))); - sendCmd(cmdsWrapper); + send(new NotificationsCountSubCmd(2)); return this; } public void markNotificationAsRead(UUID... notifications) { - WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); - cmdsWrapper.setMarkNotificationAsReadCmds(List.of(new MarkNotificationsAsReadCmd(newCmdId(), Arrays.asList(notifications)))); - sendCmd(cmdsWrapper); + send(new MarkNotificationsAsReadCmd(newCmdId(), Arrays.asList(notifications))); } public void markAllNotificationsAsRead() { - WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); - cmdsWrapper.setMarkAllNotificationsAsReadCmds(List.of(new MarkAllNotificationsAsReadCmd(newCmdId()))); - sendCmd(cmdsWrapper); - } - - public void sendCmd(WsCommandsWrapper cmdsWrapper) { - String cmd = JacksonUtil.toString(cmdsWrapper); - send(cmd); + send(new MarkAllNotificationsAsReadCmd(newCmdId())); } @Override diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java index 94c18f1b73..7f31ebe74f 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java @@ -63,7 +63,7 @@ import org.thingsboard.server.common.data.query.SingleEntityFilter; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.dao.service.DaoSqlTest; -import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper; +import org.thingsboard.server.service.ws.telemetry.cmd.TelemetryCmdsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd; @@ -229,10 +229,7 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null); - WsCommandsWrapper wrapper = new WsCommandsWrapper(); - wrapper.setEntityDataCmds(Collections.singletonList(cmd)); - - getWsClient().send(JacksonUtil.toString(wrapper)); + getWsClient().send(cmd); getWsClient().waitForReply(); getWsClient().registerWaitForUpdate();