WS api refactoring

This commit is contained in:
ViacheslavKlimov 2023-12-04 15:02:45 +02:00
parent 26cd9b226d
commit 9e9c63fea2
6 changed files with 57 additions and 60 deletions

View File

@ -66,7 +66,7 @@ import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription;
import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler; import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler;
import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper; import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper;
import org.thingsboard.server.service.ws.notification.cmd.WsCmd; import org.thingsboard.server.service.ws.notification.cmd.WsCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.SubscriptionCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.SubscriptionCmd;
@ -160,22 +160,22 @@ public class DefaultWebSocketService implements WebSocketService {
pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS); pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS);
cmdsHandlers = List.of( cmdsHandlers = List.of(
newCmdsHandler(WsCmdsWrapper::getAttrSubCmds, this::handleWsAttributesSubscriptionCmd), newCmdsHandler(WsCommandsWrapper::getAttrSubCmds, this::handleWsAttributesSubscriptionCmd),
newCmdsHandler(WsCmdsWrapper::getTsSubCmds, this::handleWsTimeseriesSubscriptionCmd), newCmdsHandler(WsCommandsWrapper::getTsSubCmds, this::handleWsTimeseriesSubscriptionCmd),
newCmdsHandler(WsCmdsWrapper::getHistoryCmds, this::handleWsHistoryCmd), newCmdsHandler(WsCommandsWrapper::getHistoryCmds, this::handleWsHistoryCmd),
newCmdsHandler(WsCmdsWrapper::getEntityDataCmds, this::handleWsEntityDataCmd), newCmdsHandler(WsCommandsWrapper::getEntityDataCmds, this::handleWsEntityDataCmd),
newCmdsHandler(WsCmdsWrapper::getAlarmDataCmds, this::handleWsAlarmDataCmd), newCmdsHandler(WsCommandsWrapper::getAlarmDataCmds, this::handleWsAlarmDataCmd),
newCmdsHandler(WsCmdsWrapper::getEntityCountCmds, this::handleWsEntityCountCmd), newCmdsHandler(WsCommandsWrapper::getEntityCountCmds, this::handleWsEntityCountCmd),
newCmdsHandler(WsCmdsWrapper::getAlarmCountCmds, this::handleWsAlarmCountCmd), newCmdsHandler(WsCommandsWrapper::getAlarmCountCmds, this::handleWsAlarmCountCmd),
newCmdsHandler(WsCmdsWrapper::getEntityDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), newCmdsHandler(WsCommandsWrapper::getEntityDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd),
newCmdsHandler(WsCmdsWrapper::getAlarmDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), newCmdsHandler(WsCommandsWrapper::getAlarmDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd),
newCmdsHandler(WsCmdsWrapper::getEntityCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), newCmdsHandler(WsCommandsWrapper::getEntityCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd),
newCmdsHandler(WsCmdsWrapper::getAlarmCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd), newCmdsHandler(WsCommandsWrapper::getAlarmCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd),
newCmdHandler(WsCmdsWrapper::getUnreadNotificationsSubCmd, notificationCmdsHandler::handleUnreadNotificationsSubCmd), newCmdsHandler(WsCommandsWrapper::getUnreadNotificationsSubCmds, notificationCmdsHandler::handleUnreadNotificationsSubCmd),
newCmdHandler(WsCmdsWrapper::getUnreadNotificationsCountSubCmd, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd), newCmdsHandler(WsCommandsWrapper::getUnreadNotificationsCountSubCmds, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd),
newCmdHandler(WsCmdsWrapper::getMarkNotificationAsReadCmd, notificationCmdsHandler::handleMarkAsReadCmd), newCmdsHandler(WsCommandsWrapper::getMarkNotificationAsReadCmds, notificationCmdsHandler::handleMarkAsReadCmd),
newCmdHandler(WsCmdsWrapper::getMarkAllNotificationsAsReadCmd, notificationCmdsHandler::handleMarkAllAsReadCmd), newCmdsHandler(WsCommandsWrapper::getMarkAllNotificationsAsReadCmds, notificationCmdsHandler::handleMarkAllAsReadCmd),
newCmdHandler(WsCmdsWrapper::getNotificationsUnsubCmd, notificationCmdsHandler::handleUnsubCmd) newCmdsHandler(WsCommandsWrapper::getNotificationsUnsubCmds, notificationCmdsHandler::handleUnsubCmd)
); );
} }
@ -232,7 +232,7 @@ public class DefaultWebSocketService implements WebSocketService {
} }
private void processCmds(WebSocketSessionRef sessionRef, String msg) throws JsonProcessingException { private void processCmds(WebSocketSessionRef sessionRef, String msg) throws JsonProcessingException {
WsCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, WsCmdsWrapper.class); WsCommandsWrapper cmdsWrapper = JacksonUtil.fromString(msg, WsCommandsWrapper.class);
processCmds(sessionRef, cmdsWrapper); processCmds(sessionRef, cmdsWrapper);
} }
@ -241,7 +241,7 @@ public class DefaultWebSocketService implements WebSocketService {
processCmds(sessionRef, cmdsWrapper.toCommonCmdsWrapper()); processCmds(sessionRef, cmdsWrapper.toCommonCmdsWrapper());
} }
private void processCmds(WebSocketSessionRef sessionRef, WsCmdsWrapper cmdsWrapper) { private void processCmds(WebSocketSessionRef sessionRef, WsCommandsWrapper cmdsWrapper) {
if (cmdsWrapper == null) { if (cmdsWrapper == null) {
return; return;
} }
@ -1033,22 +1033,17 @@ public class DefaultWebSocketService implements WebSocketService {
} }
public static <C extends WsCmd> WsCmdHandler<C> newCmdHandler(java.util.function.Function<WsCmdsWrapper, C> cmdExtractor, public static <C extends WsCmd> WsCmdsHandler<C> newCmdsHandler(java.util.function.Function<WsCommandsWrapper, List<C>> cmdsExtractor,
BiConsumer<WebSocketSessionRef, C> handler) {
return new WsCmdHandler<>(cmdExtractor, handler);
}
public static <C extends WsCmd> WsCmdsHandler<C> newCmdsHandler(java.util.function.Function<WsCmdsWrapper, List<C>> cmdsExtractor,
BiConsumer<WebSocketSessionRef, C> handler) { BiConsumer<WebSocketSessionRef, C> handler) {
return new WsCmdsHandler<>(cmdsExtractor, handler); return new WsCmdsHandler<>(cmdsExtractor, handler);
} }
@RequiredArgsConstructor @RequiredArgsConstructor
public static class WsCmdsHandler<C extends WsCmd> { public static class WsCmdsHandler<C extends WsCmd> {
private final java.util.function.Function<WsCmdsWrapper, List<C>> cmdsExtractor; private final java.util.function.Function<WsCommandsWrapper, List<C>> cmdsExtractor;
protected final BiConsumer<WebSocketSessionRef, C> handler; protected final BiConsumer<WebSocketSessionRef, C> handler;
public List<C> extract(WsCmdsWrapper cmdsWrapper) { public List<C> extract(WsCommandsWrapper cmdsWrapper) {
return cmdsExtractor.apply(cmdsWrapper); return cmdsExtractor.apply(cmdsWrapper);
} }
@ -1059,7 +1054,7 @@ public class DefaultWebSocketService implements WebSocketService {
} }
public static class WsCmdHandler<C extends WsCmd> extends WsCmdsHandler<C> { public static class WsCmdHandler<C extends WsCmd> extends WsCmdsHandler<C> {
public WsCmdHandler(java.util.function.Function<WsCmdsWrapper, C> cmdExtractor, BiConsumer<WebSocketSessionRef, C> handler) { public WsCmdHandler(java.util.function.Function<WsCommandsWrapper, C> cmdExtractor, BiConsumer<WebSocketSessionRef, C> handler) {
super(cmdsWrapper -> { super(cmdsWrapper -> {
C cmd = cmdExtractor.apply(cmdsWrapper); C cmd = cmdExtractor.apply(cmdsWrapper);
return cmd != null ? List.of(cmd) : null; return cmd != null ? List.of(cmd) : null;

View File

@ -17,10 +17,12 @@ package org.thingsboard.server.service.ws.notification.cmd;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data; import lombok.Data;
import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper;
import java.util.List;
/** /**
* @deprecated Use {@link org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper}. This class is left for backward compatibility * @deprecated Use {@link WsCommandsWrapper}. This class is left for backward compatibility
* */ * */
@Data @Data
@Deprecated @Deprecated
@ -37,13 +39,13 @@ public class NotificationCmdsWrapper {
private NotificationsUnsubCmd unsubCmd; private NotificationsUnsubCmd unsubCmd;
@JsonIgnore @JsonIgnore
public WsCmdsWrapper toCommonCmdsWrapper() { public WsCommandsWrapper toCommonCmdsWrapper() {
WsCmdsWrapper wrapper = new WsCmdsWrapper(); WsCommandsWrapper wrapper = new WsCommandsWrapper();
wrapper.setUnreadNotificationsCountSubCmd(unreadCountSubCmd); wrapper.setUnreadNotificationsCountSubCmds(List.of(unreadCountSubCmd));
wrapper.setUnreadNotificationsSubCmd(unreadSubCmd); wrapper.setUnreadNotificationsSubCmds(List.of(unreadSubCmd));
wrapper.setMarkNotificationAsReadCmd(markAsReadCmd); wrapper.setMarkNotificationAsReadCmds(List.of(markAsReadCmd));
wrapper.setMarkAllNotificationsAsReadCmd(markAllAsReadCmd); wrapper.setMarkAllNotificationsAsReadCmds(List.of(markAllAsReadCmd));
wrapper.setNotificationsUnsubCmd(unsubCmd); wrapper.setNotificationsUnsubCmds(List.of(unsubCmd));
return wrapper; return wrapper;
} }

View File

@ -39,7 +39,7 @@ import java.util.List;
* @author Andrew Shvayka * @author Andrew Shvayka
*/ */
@Data @Data
public class WsCmdsWrapper { public class WsCommandsWrapper {
private List<AttributesSubscriptionCmd> attrSubCmds; private List<AttributesSubscriptionCmd> attrSubCmds;
@ -63,14 +63,14 @@ public class WsCmdsWrapper {
private List<AlarmCountUnsubscribeCmd> alarmCountUnsubscribeCmds; private List<AlarmCountUnsubscribeCmd> alarmCountUnsubscribeCmds;
private NotificationsCountSubCmd unreadNotificationsCountSubCmd; private List<NotificationsCountSubCmd> unreadNotificationsCountSubCmds;
private NotificationsSubCmd unreadNotificationsSubCmd; private List<NotificationsSubCmd> unreadNotificationsSubCmds;
private MarkNotificationsAsReadCmd markNotificationAsReadCmd; private List<MarkNotificationsAsReadCmd> markNotificationAsReadCmds;
private MarkAllNotificationsAsReadCmd markAllNotificationsAsReadCmd; private List<MarkAllNotificationsAsReadCmd> markAllNotificationsAsReadCmds;
private NotificationsUnsubCmd notificationsUnsubCmd; private List<NotificationsUnsubCmd> notificationsUnsubCmds;
} }

View File

@ -28,7 +28,7 @@ import org.thingsboard.server.common.data.query.EntityDataPageLink;
import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityFilter; import org.thingsboard.server.common.data.query.EntityFilter;
import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate;
@ -106,19 +106,19 @@ public class TbTestWebSocketClient extends WebSocketClient {
} }
public void send(EntityDataCmd cmd) throws NotYetConnectedException { public void send(EntityDataCmd cmd) throws NotYetConnectedException {
WsCmdsWrapper wrapper = new WsCmdsWrapper(); WsCommandsWrapper wrapper = new WsCommandsWrapper();
wrapper.setEntityDataCmds(Collections.singletonList(cmd)); wrapper.setEntityDataCmds(Collections.singletonList(cmd));
this.send(JacksonUtil.toString(wrapper)); this.send(JacksonUtil.toString(wrapper));
} }
public void send(EntityCountCmd cmd) throws NotYetConnectedException { public void send(EntityCountCmd cmd) throws NotYetConnectedException {
WsCmdsWrapper wrapper = new WsCmdsWrapper(); WsCommandsWrapper wrapper = new WsCommandsWrapper();
wrapper.setEntityCountCmds(Collections.singletonList(cmd)); wrapper.setEntityCountCmds(Collections.singletonList(cmd));
this.send(JacksonUtil.toString(wrapper)); this.send(JacksonUtil.toString(wrapper));
} }
public void send(AlarmCountCmd cmd) throws NotYetConnectedException { public void send(AlarmCountCmd cmd) throws NotYetConnectedException {
WsCmdsWrapper wrapper = new WsCmdsWrapper(); WsCommandsWrapper wrapper = new WsCommandsWrapper();
wrapper.setAlarmCountCmds(Collections.singletonList(cmd)); wrapper.setAlarmCountCmds(Collections.singletonList(cmd));
this.send(JacksonUtil.toString(wrapper)); this.send(JacksonUtil.toString(wrapper));
} }
@ -240,7 +240,7 @@ public class TbTestWebSocketClient extends WebSocketClient {
cmd.setEntityId(entityId.getId().toString()); cmd.setEntityId(entityId.getId().toString());
cmd.setScope(scope); cmd.setScope(scope);
cmd.setKeys(String.join(",", keys)); cmd.setKeys(String.join(",", keys));
WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper();
cmdsWrapper.setAttrSubCmds(List.of(cmd)); cmdsWrapper.setAttrSubCmds(List.of(cmd));
JsonNode msg = JacksonUtil.valueToTree(cmdsWrapper); JsonNode msg = JacksonUtil.valueToTree(cmdsWrapper);
((ObjectNode) msg.get("attrSubCmds").get(0)).remove("type"); ((ObjectNode) msg.get("attrSubCmds").get(0)).remove("type");

View File

@ -28,7 +28,7 @@ import org.thingsboard.server.service.ws.notification.cmd.NotificationsCountSubC
import org.thingsboard.server.service.ws.notification.cmd.NotificationsSubCmd; import org.thingsboard.server.service.ws.notification.cmd.NotificationsSubCmd;
import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsCountUpdate; import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsCountUpdate;
import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsUpdate; import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdateType; import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdateType;
import java.net.URI; import java.net.URI;
@ -54,33 +54,33 @@ public class NotificationApiWsClient extends TbTestWebSocketClient {
} }
public NotificationApiWsClient subscribeForUnreadNotifications(int limit) { public NotificationApiWsClient subscribeForUnreadNotifications(int limit) {
WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper();
cmdsWrapper.setUnreadNotificationsSubCmd(new NotificationsSubCmd(1, limit)); cmdsWrapper.setUnreadNotificationsSubCmds(List.of(new NotificationsSubCmd(1, limit)));
sendCmd(cmdsWrapper); sendCmd(cmdsWrapper);
this.limit = limit; this.limit = limit;
return this; return this;
} }
public NotificationApiWsClient subscribeForUnreadNotificationsCount() { public NotificationApiWsClient subscribeForUnreadNotificationsCount() {
WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper();
cmdsWrapper.setUnreadNotificationsCountSubCmd(new NotificationsCountSubCmd(2)); cmdsWrapper.setUnreadNotificationsCountSubCmds(List.of(new NotificationsCountSubCmd(2)));
sendCmd(cmdsWrapper); sendCmd(cmdsWrapper);
return this; return this;
} }
public void markNotificationAsRead(UUID... notifications) { public void markNotificationAsRead(UUID... notifications) {
WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper();
cmdsWrapper.setMarkNotificationAsReadCmd(new MarkNotificationsAsReadCmd(newCmdId(), Arrays.asList(notifications))); cmdsWrapper.setMarkNotificationAsReadCmds(List.of(new MarkNotificationsAsReadCmd(newCmdId(), Arrays.asList(notifications))));
sendCmd(cmdsWrapper); sendCmd(cmdsWrapper);
} }
public void markAllNotificationsAsRead() { public void markAllNotificationsAsRead() {
WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper(); WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper();
cmdsWrapper.setMarkAllNotificationsAsReadCmd(new MarkAllNotificationsAsReadCmd(newCmdId())); cmdsWrapper.setMarkAllNotificationsAsReadCmds(List.of(new MarkAllNotificationsAsReadCmd(newCmdId())));
sendCmd(cmdsWrapper); sendCmd(cmdsWrapper);
} }
public void sendCmd(WsCmdsWrapper cmdsWrapper) { public void sendCmd(WsCommandsWrapper cmdsWrapper) {
String cmd = JacksonUtil.toString(cmdsWrapper); String cmd = JacksonUtil.toString(cmdsWrapper);
send(cmd); send(cmd);
} }

View File

@ -63,7 +63,7 @@ import org.thingsboard.server.common.data.query.SingleEntityFilter;
import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.WsCommandsWrapper;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd;
@ -229,7 +229,7 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null); EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
WsCmdsWrapper wrapper = new WsCmdsWrapper(); WsCommandsWrapper wrapper = new WsCommandsWrapper();
wrapper.setEntityDataCmds(Collections.singletonList(cmd)); wrapper.setEntityDataCmds(Collections.singletonList(cmd));
getWsClient().send(JacksonUtil.toString(wrapper)); getWsClient().send(JacksonUtil.toString(wrapper));