Merge branch 'websocket-improvements' into feature/notifications-widget
This commit is contained in:
commit
2e2d3b975f
@ -46,8 +46,9 @@ import java.util.Map;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class WebSocketConfiguration implements WebSocketConfigurer {
|
public class WebSocketConfiguration implements WebSocketConfigurer {
|
||||||
|
|
||||||
public static final String WS_PLUGIN_PREFIX = "/api/ws/plugins/";
|
public static final String WS_API_ENDPOINT = "/api/ws";
|
||||||
private static final String WS_PLUGIN_MAPPING = WS_PLUGIN_PREFIX + "**";
|
public static final String WS_PLUGINS_ENDPOINT = "/api/ws/plugins/";
|
||||||
|
private static final String WS_API_MAPPING = "/api/ws/**";
|
||||||
|
|
||||||
private final WebSocketHandler wsHandler;
|
private final WebSocketHandler wsHandler;
|
||||||
|
|
||||||
@ -65,7 +66,7 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
|
|||||||
log.error("TbWebSocketHandler expected but [{}] provided", wsHandler);
|
log.error("TbWebSocketHandler expected but [{}] provided", wsHandler);
|
||||||
throw new RuntimeException("TbWebSocketHandler expected but " + wsHandler + " provided");
|
throw new RuntimeException("TbWebSocketHandler expected but " + wsHandler + " provided");
|
||||||
}
|
}
|
||||||
registry.addHandler(wsHandler, WS_PLUGIN_MAPPING).setAllowedOriginPatterns("*")
|
registry.addHandler(wsHandler, WS_API_MAPPING).setAllowedOriginPatterns("*")
|
||||||
.addInterceptors(new HttpSessionHandshakeInterceptor(), new HandshakeInterceptor() {
|
.addInterceptors(new HttpSessionHandshakeInterceptor(), new HandshakeInterceptor() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
package org.thingsboard.server.controller.plugin;
|
package org.thingsboard.server.controller.plugin;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.springframework.beans.factory.BeanCreationNotAllowedException;
|
import org.springframework.beans.factory.BeanCreationNotAllowedException;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
@ -52,7 +53,6 @@ import javax.websocket.SendHandler;
|
|||||||
import javax.websocket.SendResult;
|
import javax.websocket.SendResult;
|
||||||
import javax.websocket.Session;
|
import javax.websocket.Session;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
|
||||||
import java.security.InvalidParameterException;
|
import java.security.InvalidParameterException;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
@ -199,17 +199,16 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private WebSocketSessionRef toRef(WebSocketSession session) throws IOException {
|
private WebSocketSessionRef toRef(WebSocketSession session) {
|
||||||
URI sessionUri = session.getUri();
|
String path = session.getUri().getPath();
|
||||||
String path = sessionUri.getPath();
|
WebSocketSessionType sessionType;
|
||||||
path = path.substring(WebSocketConfiguration.WS_PLUGIN_PREFIX.length());
|
if (path.equals(WebSocketConfiguration.WS_API_ENDPOINT)) {
|
||||||
if (path.length() == 0) {
|
sessionType = WebSocketSessionType.GENERAL;
|
||||||
throw new IllegalArgumentException("URL should contain plugin token!");
|
} else {
|
||||||
|
String type = StringUtils.substringAfter(path, WebSocketConfiguration.WS_PLUGINS_ENDPOINT);
|
||||||
|
sessionType = WebSocketSessionType.forName(type)
|
||||||
|
.orElseThrow(() -> new InvalidParameterException("Unknown session type"));
|
||||||
}
|
}
|
||||||
String[] pathElements = path.split("/");
|
|
||||||
String serviceToken = pathElements[0];
|
|
||||||
WebSocketSessionType sessionType = WebSocketSessionType.forName(serviceToken)
|
|
||||||
.orElseThrow(() -> new InvalidParameterException("Can't find plugin with specified token!"));
|
|
||||||
|
|
||||||
SecurityUser currentUser = (SecurityUser) ((Authentication) session.getPrincipal()).getPrincipal();
|
SecurityUser currentUser = (SecurityUser) ((Authentication) session.getPrincipal()).getPrincipal();
|
||||||
return WebSocketSessionRef.builder()
|
return WebSocketSessionRef.builder()
|
||||||
@ -411,10 +410,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!limitAllowed) {
|
if (!limitAllowed) {
|
||||||
log.info("[{}][{}][{}] Failed to start session. Max tenant sessions limit reached"
|
log.info("[{}][{}][{}] Failed to start session. Max tenant sessions limit reached"
|
||||||
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
||||||
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max tenant sessions limit reached!"));
|
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max tenant sessions limit reached!"));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -428,10 +427,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!limitAllowed) {
|
if (!limitAllowed) {
|
||||||
log.info("[{}][{}][{}] Failed to start session. Max customer sessions limit reached"
|
log.info("[{}][{}][{}] Failed to start session. Max customer sessions limit reached"
|
||||||
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
||||||
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max customer sessions limit reached"));
|
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max customer sessions limit reached"));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0
|
if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0
|
||||||
@ -444,10 +443,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!limitAllowed) {
|
if (!limitAllowed) {
|
||||||
log.info("[{}][{}][{}] Failed to start session. Max regular user sessions limit reached"
|
log.info("[{}][{}][{}] Failed to start session. Max regular user sessions limit reached"
|
||||||
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
||||||
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max regular user sessions limit reached"));
|
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max regular user sessions limit reached"));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0
|
if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0
|
||||||
@ -460,10 +459,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!limitAllowed) {
|
if (!limitAllowed) {
|
||||||
log.info("[{}][{}][{}] Failed to start session. Max public user sessions limit reached"
|
log.info("[{}][{}][{}] Failed to start session. Max public user sessions limit reached"
|
||||||
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
||||||
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max public user sessions limit reached"));
|
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max public user sessions limit reached"));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,8 +21,10 @@ import com.google.common.util.concurrent.FutureCallback;
|
|||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import lombok.Getter;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.web.socket.CloseStatus;
|
import org.springframework.web.socket.CloseStatus;
|
||||||
@ -65,8 +67,7 @@ import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
|
|||||||
import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription;
|
import org.thingsboard.server.service.subscription.TbTimeSeriesSubscription;
|
||||||
import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler;
|
import org.thingsboard.server.service.ws.notification.NotificationCommandsHandler;
|
||||||
import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper;
|
import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper;
|
||||||
import org.thingsboard.server.service.ws.notification.cmd.WsCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.TelemetryCmdsWrapper;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper;
|
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v1.SubscriptionCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.SubscriptionCmd;
|
||||||
@ -149,7 +150,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
private ScheduledExecutorService pingExecutor;
|
private ScheduledExecutorService pingExecutor;
|
||||||
private String serviceId;
|
private String serviceId;
|
||||||
|
|
||||||
private List<WsCmdsHandler<? extends WsCmd>> cmdsHandlers;
|
private List<WsCmdHandler<? extends WsCmd>> cmdsHandlers;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
@ -160,22 +161,22 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS);
|
pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
cmdsHandlers = List.of(
|
cmdsHandlers = List.of(
|
||||||
newCmdsHandler(WsCmdsWrapper::getAttrSubCmds, this::handleWsAttributesSubscriptionCmd),
|
newCmdHandler(WsCmdType.ATTRIBUTES, this::handleWsAttributesSubscriptionCmd),
|
||||||
newCmdsHandler(WsCmdsWrapper::getTsSubCmds, this::handleWsTimeseriesSubscriptionCmd),
|
newCmdHandler(WsCmdType.TIMESERIES, this::handleWsTimeseriesSubscriptionCmd),
|
||||||
newCmdsHandler(WsCmdsWrapper::getHistoryCmds, this::handleWsHistoryCmd),
|
newCmdHandler(WsCmdType.TIMESERIES_HISTORY, this::handleWsHistoryCmd),
|
||||||
newCmdsHandler(WsCmdsWrapper::getEntityDataCmds, this::handleWsEntityDataCmd),
|
newCmdHandler(WsCmdType.ENTITY_DATA, this::handleWsEntityDataCmd),
|
||||||
newCmdsHandler(WsCmdsWrapper::getAlarmDataCmds, this::handleWsAlarmDataCmd),
|
newCmdHandler(WsCmdType.ALARM_DATA, this::handleWsAlarmDataCmd),
|
||||||
newCmdsHandler(WsCmdsWrapper::getEntityCountCmds, this::handleWsEntityCountCmd),
|
newCmdHandler(WsCmdType.ENTITY_COUNT, this::handleWsEntityCountCmd),
|
||||||
newCmdsHandler(WsCmdsWrapper::getAlarmCountCmds, this::handleWsAlarmCountCmd),
|
newCmdHandler(WsCmdType.ALARM_COUNT, this::handleWsAlarmCountCmd),
|
||||||
newCmdsHandler(WsCmdsWrapper::getEntityDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd),
|
newCmdHandler(WsCmdType.ENTITY_DATA_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd),
|
||||||
newCmdsHandler(WsCmdsWrapper::getAlarmDataUnsubscribeCmds, this::handleWsDataUnsubscribeCmd),
|
newCmdHandler(WsCmdType.ALARM_DATA_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd),
|
||||||
newCmdsHandler(WsCmdsWrapper::getEntityCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd),
|
newCmdHandler(WsCmdType.ENTITY_COUNT_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd),
|
||||||
newCmdsHandler(WsCmdsWrapper::getAlarmCountUnsubscribeCmds, this::handleWsDataUnsubscribeCmd),
|
newCmdHandler(WsCmdType.ALARM_COUNT_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd),
|
||||||
newCmdHandler(WsCmdsWrapper::getUnreadNotificationsSubCmd, notificationCmdsHandler::handleUnreadNotificationsSubCmd),
|
newCmdHandler(WsCmdType.NOTIFICATIONS, notificationCmdsHandler::handleUnreadNotificationsSubCmd),
|
||||||
newCmdHandler(WsCmdsWrapper::getUnreadNotificationsCountSubCmd, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd),
|
newCmdHandler(WsCmdType.NOTIFICATIONS_COUNT, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd),
|
||||||
newCmdHandler(WsCmdsWrapper::getMarkNotificationAsReadCmd, notificationCmdsHandler::handleMarkAsReadCmd),
|
newCmdHandler(WsCmdType.MARK_NOTIFICATIONS_AS_READ, notificationCmdsHandler::handleMarkAsReadCmd),
|
||||||
newCmdHandler(WsCmdsWrapper::getMarkAllNotificationsAsReadCmd, notificationCmdsHandler::handleMarkAllAsReadCmd),
|
newCmdHandler(WsCmdType.MARK_ALL_NOTIFICATIONS_AS_READ, notificationCmdsHandler::handleMarkAllAsReadCmd),
|
||||||
newCmdHandler(WsCmdsWrapper::getNotificationsUnsubCmd, notificationCmdsHandler::handleUnsubCmd)
|
newCmdHandler(WsCmdType.NOTIFICATIONS_UNSUBSCRIBE, notificationCmdsHandler::handleUnsubCmd)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -217,91 +218,71 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
WsCommandsWrapper cmdsWrapper;
|
||||||
switch (sessionRef.getSessionType()) {
|
switch (sessionRef.getSessionType()) {
|
||||||
case GENERAL:
|
case GENERAL:
|
||||||
processCmds(sessionRef, msg);
|
cmdsWrapper = JacksonUtil.fromString(msg, WsCommandsWrapper.class);
|
||||||
|
break;
|
||||||
|
case TELEMETRY:
|
||||||
|
cmdsWrapper = JacksonUtil.fromString(msg, TelemetryCmdsWrapper.class).toCommonCmdsWrapper();
|
||||||
break;
|
break;
|
||||||
case NOTIFICATIONS:
|
case NOTIFICATIONS:
|
||||||
processNotificationCmds(sessionRef, msg);
|
cmdsWrapper = JacksonUtil.fromString(msg, NotificationCmdsWrapper.class).toCommonCmdsWrapper();
|
||||||
break;
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("Unknown session type");
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
processCmds(sessionRef, cmdsWrapper);
|
||||||
|
} catch (Exception e) {
|
||||||
log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e);
|
log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e);
|
||||||
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(UNKNOWN_SUBSCRIPTION_ID, SubscriptionErrorCode.BAD_REQUEST, FAILED_TO_PARSE_WS_COMMAND));
|
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(UNKNOWN_SUBSCRIPTION_ID, SubscriptionErrorCode.BAD_REQUEST, FAILED_TO_PARSE_WS_COMMAND));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processCmds(WebSocketSessionRef sessionRef, String msg) throws JsonProcessingException {
|
private void processCmds(WebSocketSessionRef sessionRef, WsCommandsWrapper cmdsWrapper) {
|
||||||
WsCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, WsCmdsWrapper.class);
|
if (cmdsWrapper == null || CollectionUtils.isEmpty(cmdsWrapper.getCmds())) {
|
||||||
processCmds(sessionRef, cmdsWrapper);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processNotificationCmds(WebSocketSessionRef sessionRef, String msg) throws IOException {
|
|
||||||
NotificationCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, NotificationCmdsWrapper.class);
|
|
||||||
processCmds(sessionRef, cmdsWrapper.toCommonCmdsWrapper());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processCmds(WebSocketSessionRef sessionRef, WsCmdsWrapper cmdsWrapper) {
|
|
||||||
if (cmdsWrapper == null) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String sessionId = sessionRef.getSessionId();
|
String sessionId = sessionRef.getSessionId();
|
||||||
for (WsCmdsHandler<? extends WsCmd> cmdHandler : cmdsHandlers) {
|
if (!validateSessionMetadata(sessionRef, cmdsWrapper.getCmds().get(0).getCmdId(), sessionId)) {
|
||||||
List<? extends WsCmd> cmds = cmdHandler.extract(cmdsWrapper);
|
return;
|
||||||
if (cmds == null) {
|
}
|
||||||
continue;
|
|
||||||
}
|
for (WsCmd cmd : cmdsWrapper.getCmds()) {
|
||||||
for (WsCmd cmd : cmds) {
|
log.debug("[{}][{}][{}] Processing cmd: {}", sessionId, cmd.getType(), cmd.getCmdId(), cmd);
|
||||||
if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId)) {
|
try {
|
||||||
try {
|
getCmdHandler(cmd.getType()).handle(sessionRef, cmd);
|
||||||
cmdHandler.handle(sessionRef, cmd);
|
} catch (Exception e) {
|
||||||
} catch (Exception e) {
|
log.error("[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}", sessionId,
|
||||||
log.error("[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}", sessionId,
|
sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), cmd, e);
|
||||||
sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), cmd, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleWsEntityDataCmd(WebSocketSessionRef sessionRef, EntityDataCmd cmd) {
|
private void handleWsEntityDataCmd(WebSocketSessionRef sessionRef, EntityDataCmd cmd) {
|
||||||
String sessionId = sessionRef.getSessionId();
|
|
||||||
log.debug("[{}] Processing: {}", sessionId, cmd);
|
|
||||||
|
|
||||||
if (validateSubscriptionCmd(sessionRef, cmd)) {
|
if (validateSubscriptionCmd(sessionRef, cmd)) {
|
||||||
entityDataSubService.handleCmd(sessionRef, cmd);
|
entityDataSubService.handleCmd(sessionRef, cmd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleWsEntityCountCmd(WebSocketSessionRef sessionRef, EntityCountCmd cmd) {
|
private void handleWsEntityCountCmd(WebSocketSessionRef sessionRef, EntityCountCmd cmd) {
|
||||||
String sessionId = sessionRef.getSessionId();
|
|
||||||
log.debug("[{}] Processing: {}", sessionId, cmd);
|
|
||||||
|
|
||||||
if (validateSubscriptionCmd(sessionRef, cmd)) {
|
if (validateSubscriptionCmd(sessionRef, cmd)) {
|
||||||
entityDataSubService.handleCmd(sessionRef, cmd);
|
entityDataSubService.handleCmd(sessionRef, cmd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleWsAlarmDataCmd(WebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
|
private void handleWsAlarmDataCmd(WebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
|
||||||
String sessionId = sessionRef.getSessionId();
|
|
||||||
log.debug("[{}] Processing: {}", sessionId, cmd);
|
|
||||||
|
|
||||||
if (validateSubscriptionCmd(sessionRef, cmd)) {
|
if (validateSubscriptionCmd(sessionRef, cmd)) {
|
||||||
entityDataSubService.handleCmd(sessionRef, cmd);
|
entityDataSubService.handleCmd(sessionRef, cmd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleWsDataUnsubscribeCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) {
|
private void handleWsDataUnsubscribeCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) {
|
||||||
String sessionId = sessionRef.getSessionId();
|
|
||||||
log.debug("[{}] Processing: {}", sessionId, cmd);
|
|
||||||
entityDataSubService.cancelSubscription(sessionRef.getSessionId(), cmd);
|
entityDataSubService.cancelSubscription(sessionRef.getSessionId(), cmd);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleWsAlarmCountCmd(WebSocketSessionRef sessionRef, AlarmCountCmd cmd) {
|
private void handleWsAlarmCountCmd(WebSocketSessionRef sessionRef, AlarmCountCmd cmd) {
|
||||||
String sessionId = sessionRef.getSessionId();
|
if (validateCmd(sessionRef, cmd)) {
|
||||||
log.debug("[{}] Processing: {}", sessionId, cmd);
|
|
||||||
|
|
||||||
if (validateSubscriptionCmd(sessionRef, cmd)) {
|
|
||||||
entityDataSubService.handleCmd(sessionRef, cmd);
|
entityDataSubService.handleCmd(sessionRef, cmd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -447,8 +428,6 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
String sessionId = sessionRef.getSessionId();
|
String sessionId = sessionRef.getSessionId();
|
||||||
log.debug("[{}] Processing: {}", sessionId, cmd);
|
|
||||||
|
|
||||||
if (cmd.isUnsubscribe()) {
|
if (cmd.isUnsubscribe()) {
|
||||||
unsubscribe(sessionRef, cmd, sessionId);
|
unsubscribe(sessionRef, cmd, sessionId);
|
||||||
} else if (validateSubscriptionCmd(sessionRef, cmd)) {
|
} else if (validateSubscriptionCmd(sessionRef, cmd)) {
|
||||||
@ -533,18 +512,15 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void handleWsHistoryCmd(WebSocketSessionRef sessionRef, GetHistoryCmd cmd) {
|
private void handleWsHistoryCmd(WebSocketSessionRef sessionRef, GetHistoryCmd cmd) {
|
||||||
if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) {
|
if (!validateCmd(sessionRef, cmd, () -> {
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
|
if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) {
|
||||||
"Device id is empty!");
|
throw new IllegalArgumentException("Device id is empty!");
|
||||||
sendWsMsg(sessionRef, update);
|
}
|
||||||
return;
|
if (cmd.getKeys() == null || cmd.getKeys().isEmpty()) {
|
||||||
}
|
throw new IllegalArgumentException("Keys are empty!");
|
||||||
if (cmd.getKeys() == null || cmd.getKeys().isEmpty()) {
|
}
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
|
})) return;
|
||||||
"Keys are empty!");
|
|
||||||
sendWsMsg(sessionRef, update);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
|
EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
|
||||||
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
|
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
|
||||||
List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
|
List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
|
||||||
@ -621,9 +597,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable e) {
|
public void onFailure(Throwable e) {
|
||||||
log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
|
log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
|
sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, FAILED_TO_FETCH_ATTRIBUTES);
|
||||||
FAILED_TO_FETCH_ATTRIBUTES);
|
|
||||||
sendWsMsg(sessionRef, update);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -641,8 +615,6 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
String sessionId = sessionRef.getSessionId();
|
String sessionId = sessionRef.getSessionId();
|
||||||
log.debug("[{}] Processing: {}", sessionId, cmd);
|
|
||||||
|
|
||||||
if (cmd.isUnsubscribe()) {
|
if (cmd.isUnsubscribe()) {
|
||||||
unsubscribe(sessionRef, cmd, sessionId);
|
unsubscribe(sessionRef, cmd, sessionId);
|
||||||
} else if (validateSubscriptionCmd(sessionRef, cmd)) {
|
} else if (validateSubscriptionCmd(sessionRef, cmd)) {
|
||||||
@ -781,9 +753,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
} else {
|
} else {
|
||||||
log.info(FAILED_TO_FETCH_DATA, e);
|
log.info(FAILED_TO_FETCH_DATA, e);
|
||||||
}
|
}
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
|
sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, FAILED_TO_FETCH_DATA);
|
||||||
FAILED_TO_FETCH_DATA);
|
|
||||||
sendWsMsg(sessionRef, update);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@ -797,86 +767,73 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, EntityDataCmd cmd) {
|
private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, EntityDataCmd cmd) {
|
||||||
if (cmd.getCmdId() < 0) {
|
return validateCmd(sessionRef, cmd, () -> {
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
|
if (cmd.getQuery() == null && !cmd.hasAnyCmd()) {
|
||||||
"Cmd id is negative value!");
|
throw new IllegalArgumentException("Query is empty!");
|
||||||
sendWsMsg(sessionRef, update);
|
}
|
||||||
return false;
|
});
|
||||||
} else if (cmd.getQuery() == null && !cmd.hasAnyCmd()) {
|
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
|
|
||||||
"Query is empty!");
|
|
||||||
sendWsMsg(sessionRef, update);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, EntityCountCmd cmd) {
|
private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, EntityCountCmd cmd) {
|
||||||
if (cmd.getCmdId() < 0) {
|
return validateCmd(sessionRef, cmd, () -> {
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
|
if (cmd.getQuery() == null) {
|
||||||
"Cmd id is negative value!");
|
throw new IllegalArgumentException("Query is empty!");
|
||||||
sendWsMsg(sessionRef, update);
|
}
|
||||||
return false;
|
});
|
||||||
} else if (cmd.getQuery() == null) {
|
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, "Query is empty!");
|
|
||||||
sendWsMsg(sessionRef, update);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
|
private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
|
||||||
if (cmd.getCmdId() < 0) {
|
return validateCmd(sessionRef, cmd, () -> {
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
|
if (cmd.getQuery() == null) {
|
||||||
"Cmd id is negative value!");
|
throw new IllegalArgumentException("Query is empty!");
|
||||||
sendWsMsg(sessionRef, update);
|
}
|
||||||
return false;
|
});
|
||||||
} else if (cmd.getQuery() == null) {
|
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
|
|
||||||
"Query is empty!");
|
|
||||||
sendWsMsg(sessionRef, update);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
|
private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
|
||||||
if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
|
return validateCmd(sessionRef, cmd, () -> {
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
|
if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
|
||||||
"Device id is empty!");
|
throw new IllegalArgumentException("Device id is empty!");
|
||||||
sendWsMsg(sessionRef, update);
|
}
|
||||||
return false;
|
});
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean validateSessionMetadata(WebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
|
|
||||||
return validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean validateSessionMetadata(WebSocketSessionRef sessionRef, int cmdId, String sessionId) {
|
private boolean validateSessionMetadata(WebSocketSessionRef sessionRef, int cmdId, String sessionId) {
|
||||||
WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
|
WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
|
||||||
if (sessionMD == null) {
|
if (sessionMD == null) {
|
||||||
log.warn("[{}] Session meta data not found. ", sessionId);
|
log.warn("[{}] Session meta data not found. ", sessionId);
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmdId, SubscriptionErrorCode.INTERNAL_ERROR,
|
sendError(sessionRef, cmdId, SubscriptionErrorCode.INTERNAL_ERROR, SESSION_META_DATA_NOT_FOUND);
|
||||||
SESSION_META_DATA_NOT_FOUND);
|
|
||||||
sendWsMsg(sessionRef, update);
|
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, AlarmCountCmd cmd) {
|
private boolean validateCmd(WebSocketSessionRef sessionRef, WsCmd cmd) {
|
||||||
|
return validateCmd(sessionRef, cmd, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <C extends WsCmd> boolean validateCmd(WebSocketSessionRef sessionRef, C cmd, Runnable validator) {
|
||||||
if (cmd.getCmdId() < 0) {
|
if (cmd.getCmdId() < 0) {
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
|
sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, "Cmd id is negative value!");
|
||||||
"Cmd id is negative value!");
|
return false;
|
||||||
sendWsMsg(sessionRef, update);
|
}
|
||||||
|
try {
|
||||||
|
if (validator != null) {
|
||||||
|
validator.run();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
sendError(sessionRef, cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, e.getMessage());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void sendError(WebSocketSessionRef sessionRef, int subId, SubscriptionErrorCode errorCode, String errorMsg) {
|
||||||
|
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(subId, errorCode, errorMsg);
|
||||||
|
sendWsMsg(sessionRef, update);
|
||||||
|
}
|
||||||
|
|
||||||
private void sendWsMsg(WebSocketSessionRef sessionRef, EntityDataUpdate update) {
|
private void sendWsMsg(WebSocketSessionRef sessionRef, EntityDataUpdate update) {
|
||||||
sendWsMsg(sessionRef, update.getCmdId(), update);
|
sendWsMsg(sessionRef, update.getCmdId(), update);
|
||||||
}
|
}
|
||||||
@ -1034,39 +991,29 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
.map(TenantProfile::getDefaultProfileConfiguration).orElse(null);
|
.map(TenantProfile::getDefaultProfileConfiguration).orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public WsCmdHandler<? extends WsCmd> getCmdHandler(WsCmdType cmdType) {
|
||||||
public static <C extends WsCmd> WsCmdHandler<C> newCmdHandler(java.util.function.Function<WsCmdsWrapper, C> cmdExtractor,
|
for (WsCmdHandler<? extends WsCmd> cmdHandler : cmdsHandlers) {
|
||||||
BiConsumer<WebSocketSessionRef, C> handler) {
|
if (cmdHandler.getCmdType() == cmdType) {
|
||||||
return new WsCmdHandler<>(cmdExtractor, handler);
|
return cmdHandler;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("Unknown command type " + cmdType);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <C extends WsCmd> WsCmdsHandler<C> newCmdsHandler(java.util.function.Function<WsCmdsWrapper, List<C>> cmdsExtractor,
|
public static <C extends WsCmd> WsCmdHandler<C> newCmdHandler(WsCmdType cmdType, BiConsumer<WebSocketSessionRef, C> handler) {
|
||||||
BiConsumer<WebSocketSessionRef, C> handler) {
|
return new WsCmdHandler<>(cmdType, handler);
|
||||||
return new WsCmdsHandler<>(cmdsExtractor, handler);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public static class WsCmdsHandler<C extends WsCmd> {
|
@Getter
|
||||||
private final java.util.function.Function<WsCmdsWrapper, List<C>> cmdsExtractor;
|
@SuppressWarnings("unchecked")
|
||||||
|
public static class WsCmdHandler<C extends WsCmd> {
|
||||||
|
private final WsCmdType cmdType;
|
||||||
protected final BiConsumer<WebSocketSessionRef, C> handler;
|
protected final BiConsumer<WebSocketSessionRef, C> handler;
|
||||||
|
|
||||||
public List<C> extract(WsCmdsWrapper cmdsWrapper) {
|
public void handle(WebSocketSessionRef sessionRef, WsCmd cmd) {
|
||||||
return cmdsExtractor.apply(cmdsWrapper);
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public void handle(WebSocketSessionRef sessionRef, Object cmd) {
|
|
||||||
handler.accept(sessionRef, (C) cmd);
|
handler.accept(sessionRef, (C) cmd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class WsCmdHandler<C extends WsCmd> extends WsCmdsHandler<C> {
|
|
||||||
public WsCmdHandler(java.util.function.Function<WsCmdsWrapper, C> cmdExtractor, BiConsumer<WebSocketSessionRef, C> handler) {
|
|
||||||
super(cmdsWrapper -> {
|
|
||||||
C cmd = cmdExtractor.apply(cmdsWrapper);
|
|
||||||
return cmd != null ? List.of(cmd) : null;
|
|
||||||
}, handler);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,23 +15,25 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.ws;
|
package org.thingsboard.server.service.ws;
|
||||||
|
|
||||||
import lombok.Getter;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
@RequiredArgsConstructor
|
@NoArgsConstructor
|
||||||
@Getter
|
@AllArgsConstructor
|
||||||
public enum WebSocketSessionType {
|
public enum WebSocketSessionType {
|
||||||
GENERAL("telemetry"),
|
GENERAL(),
|
||||||
|
TELEMETRY("telemetry"), // deprecated
|
||||||
NOTIFICATIONS("notifications"); // deprecated
|
NOTIFICATIONS("notifications"); // deprecated
|
||||||
|
|
||||||
private final String name;
|
private String name;
|
||||||
|
|
||||||
public static Optional<WebSocketSessionType> forName(String name) {
|
public static Optional<WebSocketSessionType> forName(String name) {
|
||||||
return Arrays.stream(values())
|
return Arrays.stream(values())
|
||||||
.filter(sessionType -> sessionType.getName().equals(name))
|
.filter(sessionType -> StringUtils.equals(sessionType.name, name))
|
||||||
.findFirst();
|
.findFirst();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -13,8 +13,16 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.ws.notification.cmd;
|
package org.thingsboard.server.service.ws;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
|
|
||||||
|
|
||||||
public interface WsCmd {
|
public interface WsCmd {
|
||||||
|
|
||||||
int getCmdId();
|
int getCmdId();
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
WsCmdType getType();
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -0,0 +1,37 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2023 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.service.ws;
|
||||||
|
|
||||||
|
public enum WsCmdType {
|
||||||
|
ATTRIBUTES,
|
||||||
|
TIMESERIES,
|
||||||
|
TIMESERIES_HISTORY,
|
||||||
|
ENTITY_DATA,
|
||||||
|
ENTITY_COUNT,
|
||||||
|
ALARM_DATA,
|
||||||
|
ALARM_COUNT,
|
||||||
|
|
||||||
|
NOTIFICATIONS,
|
||||||
|
NOTIFICATIONS_COUNT,
|
||||||
|
MARK_NOTIFICATIONS_AS_READ,
|
||||||
|
MARK_ALL_NOTIFICATIONS_AS_READ,
|
||||||
|
|
||||||
|
ALARM_DATA_UNSUBSCRIBE,
|
||||||
|
ALARM_COUNT_UNSUBSCRIBE,
|
||||||
|
ENTITY_DATA_UNSUBSCRIBE,
|
||||||
|
ENTITY_COUNT_UNSUBSCRIBE,
|
||||||
|
NOTIFICATIONS_UNSUBSCRIBE
|
||||||
|
}
|
||||||
@ -0,0 +1,69 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2023 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.service.ws;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.thingsboard.server.service.ws.notification.cmd.MarkAllNotificationsAsReadCmd;
|
||||||
|
import org.thingsboard.server.service.ws.notification.cmd.MarkNotificationsAsReadCmd;
|
||||||
|
import org.thingsboard.server.service.ws.notification.cmd.NotificationsCountSubCmd;
|
||||||
|
import org.thingsboard.server.service.ws.notification.cmd.NotificationsSubCmd;
|
||||||
|
import org.thingsboard.server.service.ws.notification.cmd.NotificationsUnsubCmd;
|
||||||
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
|
||||||
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd;
|
||||||
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.TimeseriesSubscriptionCmd;
|
||||||
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd;
|
||||||
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUnsubscribeCmd;
|
||||||
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataCmd;
|
||||||
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataUnsubscribeCmd;
|
||||||
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd;
|
||||||
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUnsubscribeCmd;
|
||||||
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
|
||||||
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUnsubscribeCmd;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
public class WsCommandsWrapper {
|
||||||
|
|
||||||
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||||
|
@JsonSubTypes({
|
||||||
|
@Type(name = "ATTRIBUTES", value = AttributesSubscriptionCmd.class),
|
||||||
|
@Type(name = "TIMESERIES", value = TimeseriesSubscriptionCmd.class),
|
||||||
|
@Type(name = "TIMESERIES_HISTORY", value = GetHistoryCmd.class),
|
||||||
|
@Type(name = "ENTITY_DATA", value = EntityDataCmd.class),
|
||||||
|
@Type(name = "ENTITY_COUNT", value = EntityCountCmd.class),
|
||||||
|
@Type(name = "ALARM_DATA", value = AlarmDataCmd.class),
|
||||||
|
@Type(name = "ALARM_COUNT", value = AlarmCountCmd.class),
|
||||||
|
@Type(name = "NOTIFICATIONS", value = NotificationsSubCmd.class),
|
||||||
|
@Type(name = "NOTIFICATIONS_COUNT", value = NotificationsCountSubCmd.class),
|
||||||
|
@Type(name = "MARK_NOTIFICATIONS_AS_READ", value = MarkNotificationsAsReadCmd.class),
|
||||||
|
@Type(name = "MARK_ALL_NOTIFICATIONS_AS_READ", value = MarkAllNotificationsAsReadCmd.class),
|
||||||
|
@Type(name = "ALARM_DATA_UNSUBSCRIBE", value = AlarmDataUnsubscribeCmd.class),
|
||||||
|
@Type(name = "ALARM_COUNT_UNSUBSCRIBE", value = AlarmCountUnsubscribeCmd.class),
|
||||||
|
@Type(name = "ENTITY_DATA_UNSUBSCRIBE", value = EntityDataUnsubscribeCmd.class),
|
||||||
|
@Type(name = "ENTITY_COUNT_UNSUBSCRIBE", value = EntityCountUnsubscribeCmd.class),
|
||||||
|
@Type(name = "NOTIFICATIONS_UNSUBSCRIBE", value = NotificationsUnsubCmd.class),
|
||||||
|
})
|
||||||
|
private List<WsCmd> cmds;
|
||||||
|
|
||||||
|
}
|
||||||
@ -18,10 +18,17 @@ package org.thingsboard.server.service.ws.notification.cmd;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmd;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
public class MarkAllNotificationsAsReadCmd implements WsCmd {
|
public class MarkAllNotificationsAsReadCmd implements WsCmd {
|
||||||
private int cmdId;
|
private int cmdId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.MARK_ALL_NOTIFICATIONS_AS_READ;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,6 +18,8 @@ package org.thingsboard.server.service.ws.notification.cmd;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmd;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
@ -28,4 +30,9 @@ import java.util.UUID;
|
|||||||
public class MarkNotificationsAsReadCmd implements WsCmd {
|
public class MarkNotificationsAsReadCmd implements WsCmd {
|
||||||
private int cmdId;
|
private int cmdId;
|
||||||
private List<UUID> notifications;
|
private List<UUID> notifications;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.MARK_NOTIFICATIONS_AS_READ;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,10 +17,14 @@ package org.thingsboard.server.service.ws.notification.cmd;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper;
|
import org.thingsboard.server.service.ws.WsCommandsWrapper;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated Use {@link org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper}. This class is left for backward compatibility
|
* @deprecated Use {@link WsCommandsWrapper}. This class is left for backward compatibility
|
||||||
* */
|
* */
|
||||||
@Data
|
@Data
|
||||||
@Deprecated
|
@Deprecated
|
||||||
@ -37,14 +41,12 @@ public class NotificationCmdsWrapper {
|
|||||||
private NotificationsUnsubCmd unsubCmd;
|
private NotificationsUnsubCmd unsubCmd;
|
||||||
|
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
public WsCmdsWrapper toCommonCmdsWrapper() {
|
public WsCommandsWrapper toCommonCmdsWrapper() {
|
||||||
WsCmdsWrapper wrapper = new WsCmdsWrapper();
|
return new WsCommandsWrapper(Stream.of(
|
||||||
wrapper.setUnreadNotificationsCountSubCmd(unreadCountSubCmd);
|
unreadCountSubCmd, unreadSubCmd, markAsReadCmd, markAllAsReadCmd, unsubCmd
|
||||||
wrapper.setUnreadNotificationsSubCmd(unreadSubCmd);
|
)
|
||||||
wrapper.setMarkNotificationAsReadCmd(markAsReadCmd);
|
.filter(Objects::nonNull)
|
||||||
wrapper.setMarkAllNotificationsAsReadCmd(markAllAsReadCmd);
|
.collect(Collectors.toList()));
|
||||||
wrapper.setNotificationsUnsubCmd(unsubCmd);
|
|
||||||
return wrapper;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,10 +18,17 @@ package org.thingsboard.server.service.ws.notification.cmd;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmd;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
public class NotificationsCountSubCmd implements WsCmd {
|
public class NotificationsCountSubCmd implements WsCmd {
|
||||||
private int cmdId;
|
private int cmdId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.NOTIFICATIONS_COUNT;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,6 +19,8 @@ import lombok.AllArgsConstructor;
|
|||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import org.thingsboard.server.common.data.notification.NotificationType;
|
import org.thingsboard.server.common.data.notification.NotificationType;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmd;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
@ -29,4 +31,9 @@ public class NotificationsSubCmd implements WsCmd {
|
|||||||
private int cmdId;
|
private int cmdId;
|
||||||
private int limit;
|
private int limit;
|
||||||
private Set<NotificationType> types;
|
private Set<NotificationType> types;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.NOTIFICATIONS;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,6 +18,8 @@ package org.thingsboard.server.service.ws.notification.cmd;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmd;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@ -25,4 +27,9 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd;
|
|||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
public class NotificationsUnsubCmd implements UnsubscribeCmd, WsCmd {
|
public class NotificationsUnsubCmd implements UnsubscribeCmd, WsCmd {
|
||||||
private int cmdId;
|
private int cmdId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.NOTIFICATIONS_UNSUBSCRIBE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,12 +15,9 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.ws.telemetry.cmd;
|
package org.thingsboard.server.service.ws.telemetry.cmd;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.thingsboard.server.service.ws.notification.cmd.MarkAllNotificationsAsReadCmd;
|
import org.thingsboard.server.service.ws.WsCommandsWrapper;
|
||||||
import org.thingsboard.server.service.ws.notification.cmd.MarkNotificationsAsReadCmd;
|
|
||||||
import org.thingsboard.server.service.ws.notification.cmd.NotificationsCountSubCmd;
|
|
||||||
import org.thingsboard.server.service.ws.notification.cmd.NotificationsSubCmd;
|
|
||||||
import org.thingsboard.server.service.ws.notification.cmd.NotificationsUnsubCmd;
|
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.GetHistoryCmd;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v1.TimeseriesSubscriptionCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.TimeseriesSubscriptionCmd;
|
||||||
@ -33,13 +30,18 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUnsubscribe
|
|||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUnsubscribeCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUnsubscribeCmd;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Andrew Shvayka
|
* @deprecated Use {@link WsCommandsWrapper}. This class is left for backward compatibility
|
||||||
*/
|
* */
|
||||||
@Data
|
@Data
|
||||||
public class WsCmdsWrapper {
|
@Deprecated
|
||||||
|
public class TelemetryCmdsWrapper {
|
||||||
|
|
||||||
private List<AttributesSubscriptionCmd> attrSubCmds;
|
private List<AttributesSubscriptionCmd> attrSubCmds;
|
||||||
|
|
||||||
@ -63,14 +65,17 @@ public class WsCmdsWrapper {
|
|||||||
|
|
||||||
private List<AlarmCountUnsubscribeCmd> alarmCountUnsubscribeCmds;
|
private List<AlarmCountUnsubscribeCmd> alarmCountUnsubscribeCmds;
|
||||||
|
|
||||||
private NotificationsCountSubCmd unreadNotificationsCountSubCmd;
|
@JsonIgnore
|
||||||
|
public WsCommandsWrapper toCommonCmdsWrapper() {
|
||||||
private NotificationsSubCmd unreadNotificationsSubCmd;
|
return new WsCommandsWrapper(Stream.of(
|
||||||
|
attrSubCmds, tsSubCmds, historyCmds, entityDataCmds,
|
||||||
private MarkNotificationsAsReadCmd markNotificationAsReadCmd;
|
entityDataUnsubscribeCmds, alarmDataCmds, alarmDataUnsubscribeCmds,
|
||||||
|
entityCountCmds, entityCountUnsubscribeCmds,
|
||||||
private MarkAllNotificationsAsReadCmd markAllNotificationsAsReadCmd;
|
alarmCountCmds, alarmCountUnsubscribeCmds
|
||||||
|
)
|
||||||
private NotificationsUnsubCmd notificationsUnsubCmd;
|
.filter(Objects::nonNull)
|
||||||
|
.flatMap(Collection::stream)
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -16,7 +16,7 @@
|
|||||||
package org.thingsboard.server.service.ws.telemetry.cmd.v1;
|
package org.thingsboard.server.service.ws.telemetry.cmd.v1;
|
||||||
|
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import org.thingsboard.server.service.ws.telemetry.TelemetryFeature;
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Andrew Shvayka
|
* @author Andrew Shvayka
|
||||||
@ -25,8 +25,8 @@ import org.thingsboard.server.service.ws.telemetry.TelemetryFeature;
|
|||||||
public class AttributesSubscriptionCmd extends SubscriptionCmd {
|
public class AttributesSubscriptionCmd extends SubscriptionCmd {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TelemetryFeature getType() {
|
public WsCmdType getType() {
|
||||||
return TelemetryFeature.ATTRIBUTES;
|
return WsCmdType.ATTRIBUTES;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,6 +18,7 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v1;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Andrew Shvayka
|
* @author Andrew Shvayka
|
||||||
@ -37,4 +38,8 @@ public class GetHistoryCmd implements TelemetryPluginCmd {
|
|||||||
private int limit;
|
private int limit;
|
||||||
private String agg;
|
private String agg;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.TIMESERIES_HISTORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -32,8 +32,6 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
|
|||||||
private String scope;
|
private String scope;
|
||||||
private boolean unsubscribe;
|
private boolean unsubscribe;
|
||||||
|
|
||||||
public abstract TelemetryFeature getType();
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "SubscriptionCmd [entityType=" + entityType + ", entityId=" + entityId + ", tags=" + keys + ", unsubscribe=" + unsubscribe + "]";
|
return "SubscriptionCmd [entityType=" + entityType + ", entityId=" + entityId + ", tags=" + keys + ", unsubscribe=" + unsubscribe + "]";
|
||||||
|
|||||||
@ -15,7 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.ws.telemetry.cmd.v1;
|
package org.thingsboard.server.service.ws.telemetry.cmd.v1;
|
||||||
|
|
||||||
import org.thingsboard.server.service.ws.notification.cmd.WsCmd;
|
import org.thingsboard.server.service.ws.WsCmd;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Andrew Shvayka
|
* @author Andrew Shvayka
|
||||||
|
|||||||
@ -17,8 +17,9 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v1;
|
|||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import lombok.EqualsAndHashCode;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import org.thingsboard.server.service.ws.telemetry.TelemetryFeature;
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Andrew Shvayka
|
* @author Andrew Shvayka
|
||||||
@ -26,6 +27,7 @@ import org.thingsboard.server.service.ws.telemetry.TelemetryFeature;
|
|||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
@Data
|
@Data
|
||||||
|
@EqualsAndHashCode(callSuper = true)
|
||||||
public class TimeseriesSubscriptionCmd extends SubscriptionCmd {
|
public class TimeseriesSubscriptionCmd extends SubscriptionCmd {
|
||||||
|
|
||||||
private long startTs;
|
private long startTs;
|
||||||
@ -35,7 +37,7 @@ public class TimeseriesSubscriptionCmd extends SubscriptionCmd {
|
|||||||
private String agg;
|
private String agg;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TelemetryFeature getType() {
|
public WsCmdType getType() {
|
||||||
return TelemetryFeature.TIMESERIES;
|
return WsCmdType.TIMESERIES;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import org.thingsboard.server.common.data.query.AlarmCountQuery;
|
import org.thingsboard.server.common.data.query.AlarmCountQuery;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
public class AlarmCountCmd extends DataCmd {
|
public class AlarmCountCmd extends DataCmd {
|
||||||
|
|
||||||
@ -31,4 +32,9 @@ public class AlarmCountCmd extends DataCmd {
|
|||||||
super(cmdId);
|
super(cmdId);
|
||||||
this.query = query;
|
this.query = query;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.ALARM_COUNT;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,10 +16,15 @@
|
|||||||
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
|
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public class AlarmCountUnsubscribeCmd implements UnsubscribeCmd {
|
public class AlarmCountUnsubscribeCmd implements UnsubscribeCmd {
|
||||||
|
|
||||||
private final int cmdId;
|
private final int cmdId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.ALARM_COUNT_UNSUBSCRIBE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import org.thingsboard.server.common.data.query.AlarmDataQuery;
|
import org.thingsboard.server.common.data.query.AlarmDataQuery;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
public class AlarmDataCmd extends DataCmd {
|
public class AlarmDataCmd extends DataCmd {
|
||||||
|
|
||||||
@ -30,4 +31,9 @@ public class AlarmDataCmd extends DataCmd {
|
|||||||
super(cmdId);
|
super(cmdId);
|
||||||
this.query = query;
|
this.query = query;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.ALARM_DATA;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,10 +16,15 @@
|
|||||||
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
|
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public class AlarmDataUnsubscribeCmd implements UnsubscribeCmd {
|
public class AlarmDataUnsubscribeCmd implements UnsubscribeCmd {
|
||||||
|
|
||||||
private final int cmdId;
|
private final int cmdId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.ALARM_DATA_UNSUBSCRIBE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,10 +17,10 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v2;
|
|||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import org.thingsboard.server.service.ws.notification.cmd.WsCmd;
|
import org.thingsboard.server.service.ws.WsCmd;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public class DataCmd implements WsCmd {
|
public abstract class DataCmd implements WsCmd {
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
private final int cmdId;
|
private final int cmdId;
|
||||||
|
|||||||
@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import org.thingsboard.server.common.data.query.EntityCountQuery;
|
import org.thingsboard.server.common.data.query.EntityCountQuery;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
public class EntityCountCmd extends DataCmd {
|
public class EntityCountCmd extends DataCmd {
|
||||||
|
|
||||||
@ -31,4 +32,9 @@ public class EntityCountCmd extends DataCmd {
|
|||||||
super(cmdId);
|
super(cmdId);
|
||||||
this.query = query;
|
this.query = query;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.ENTITY_COUNT;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,10 +16,15 @@
|
|||||||
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
|
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public class EntityCountUnsubscribeCmd implements UnsubscribeCmd {
|
public class EntityCountUnsubscribeCmd implements UnsubscribeCmd {
|
||||||
|
|
||||||
private final int cmdId;
|
private final int cmdId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.ENTITY_COUNT_UNSUBSCRIBE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
|||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import org.thingsboard.server.common.data.query.EntityDataQuery;
|
import org.thingsboard.server.common.data.query.EntityDataQuery;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
public class EntityDataCmd extends DataCmd {
|
public class EntityDataCmd extends DataCmd {
|
||||||
|
|
||||||
@ -62,4 +63,8 @@ public class EntityDataCmd extends DataCmd {
|
|||||||
return historyCmd != null || latestCmd != null || tsCmd != null || aggHistoryCmd != null || aggTsCmd != null;
|
return historyCmd != null || latestCmd != null || tsCmd != null || aggHistoryCmd != null || aggTsCmd != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.ENTITY_DATA;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,10 +16,15 @@
|
|||||||
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
|
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import org.thingsboard.server.service.ws.WsCmdType;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public class EntityDataUnsubscribeCmd implements UnsubscribeCmd {
|
public class EntityDataUnsubscribeCmd implements UnsubscribeCmd {
|
||||||
|
|
||||||
private final int cmdId;
|
private final int cmdId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WsCmdType getType() {
|
||||||
|
return WsCmdType.ENTITY_DATA_UNSUBSCRIBE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
|
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
|
||||||
|
|
||||||
import org.thingsboard.server.service.ws.notification.cmd.WsCmd;
|
import org.thingsboard.server.service.ws.WsCmd;
|
||||||
|
|
||||||
public interface UnsubscribeCmd extends WsCmd {
|
public interface UnsubscribeCmd extends WsCmd {
|
||||||
|
|
||||||
|
|||||||
@ -52,8 +52,8 @@ public abstract class AbstractControllerTest extends AbstractNotifyEntityTest {
|
|||||||
@LocalServerPort
|
@LocalServerPort
|
||||||
protected int wsPort;
|
protected int wsPort;
|
||||||
|
|
||||||
private volatile TbTestWebSocketClient wsClient; // lazy
|
protected volatile TbTestWebSocketClient wsClient; // lazy
|
||||||
private volatile TbTestWebSocketClient anotherWsClient; // lazy
|
protected volatile TbTestWebSocketClient anotherWsClient; // lazy
|
||||||
|
|
||||||
public TbTestWebSocketClient getWsClient() {
|
public TbTestWebSocketClient getWsClient() {
|
||||||
if (wsClient == null) {
|
if (wsClient == null) {
|
||||||
@ -101,7 +101,11 @@ public abstract class AbstractControllerTest extends AbstractNotifyEntityTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected TbTestWebSocketClient buildAndConnectWebSocketClient() throws URISyntaxException, InterruptedException {
|
protected TbTestWebSocketClient buildAndConnectWebSocketClient() throws URISyntaxException, InterruptedException {
|
||||||
TbTestWebSocketClient wsClient = new TbTestWebSocketClient(new URI(WS_URL + wsPort + "/api/ws/plugins/telemetry?token=" + token));
|
return buildAndConnectWebSocketClient("/api/ws");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected TbTestWebSocketClient buildAndConnectWebSocketClient(String path) throws URISyntaxException, InterruptedException {
|
||||||
|
TbTestWebSocketClient wsClient = new TbTestWebSocketClient(new URI(WS_URL + wsPort + path + "?token=" + token));
|
||||||
assertThat(wsClient.connectBlocking(TIMEOUT, TimeUnit.SECONDS)).isTrue();
|
assertThat(wsClient.connectBlocking(TIMEOUT, TimeUnit.SECONDS)).isTrue();
|
||||||
return wsClient;
|
return wsClient;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,7 +16,6 @@
|
|||||||
package org.thingsboard.server.controller;
|
package org.thingsboard.server.controller;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.java_websocket.client.WebSocketClient;
|
import org.java_websocket.client.WebSocketClient;
|
||||||
@ -28,11 +27,10 @@ import org.thingsboard.server.common.data.query.EntityDataPageLink;
|
|||||||
import org.thingsboard.server.common.data.query.EntityDataQuery;
|
import org.thingsboard.server.common.data.query.EntityDataQuery;
|
||||||
import org.thingsboard.server.common.data.query.EntityFilter;
|
import org.thingsboard.server.common.data.query.EntityFilter;
|
||||||
import org.thingsboard.server.common.data.query.EntityKey;
|
import org.thingsboard.server.common.data.query.EntityKey;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper;
|
import org.thingsboard.server.service.ws.WsCmd;
|
||||||
|
import org.thingsboard.server.service.ws.WsCommandsWrapper;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd;
|
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd;
|
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
|
||||||
@ -105,24 +103,6 @@ public class TbTestWebSocketClient extends WebSocketClient {
|
|||||||
super.send(text);
|
super.send(text);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void send(EntityDataCmd cmd) throws NotYetConnectedException {
|
|
||||||
WsCmdsWrapper wrapper = new WsCmdsWrapper();
|
|
||||||
wrapper.setEntityDataCmds(Collections.singletonList(cmd));
|
|
||||||
this.send(JacksonUtil.toString(wrapper));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void send(EntityCountCmd cmd) throws NotYetConnectedException {
|
|
||||||
WsCmdsWrapper wrapper = new WsCmdsWrapper();
|
|
||||||
wrapper.setEntityCountCmds(Collections.singletonList(cmd));
|
|
||||||
this.send(JacksonUtil.toString(wrapper));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void send(AlarmCountCmd cmd) throws NotYetConnectedException {
|
|
||||||
WsCmdsWrapper wrapper = new WsCmdsWrapper();
|
|
||||||
wrapper.setAlarmCountCmds(Collections.singletonList(cmd));
|
|
||||||
this.send(JacksonUtil.toString(wrapper));
|
|
||||||
}
|
|
||||||
|
|
||||||
public String waitForUpdate() {
|
public String waitForUpdate() {
|
||||||
return waitForUpdate(false);
|
return waitForUpdate(false);
|
||||||
}
|
}
|
||||||
@ -240,11 +220,7 @@ public class TbTestWebSocketClient extends WebSocketClient {
|
|||||||
cmd.setEntityId(entityId.getId().toString());
|
cmd.setEntityId(entityId.getId().toString());
|
||||||
cmd.setScope(scope);
|
cmd.setScope(scope);
|
||||||
cmd.setKeys(String.join(",", keys));
|
cmd.setKeys(String.join(",", keys));
|
||||||
WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper();
|
send(cmd);
|
||||||
cmdsWrapper.setAttrSubCmds(List.of(cmd));
|
|
||||||
JsonNode msg = JacksonUtil.valueToTree(cmdsWrapper);
|
|
||||||
((ObjectNode) msg.get("attrSubCmds").get(0)).remove("type");
|
|
||||||
send(msg.toString());
|
|
||||||
return JacksonUtil.toJsonNode(waitForReply());
|
return JacksonUtil.toJsonNode(waitForReply());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -288,4 +264,10 @@ public class TbTestWebSocketClient extends WebSocketClient {
|
|||||||
return sendEntityDataQuery(edq);
|
return sendEntityDataQuery(edq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void send(WsCmd... cmds) {
|
||||||
|
WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper();
|
||||||
|
cmdsWrapper.setCmds(List.of(cmds));
|
||||||
|
send(JacksonUtil.toString(cmdsWrapper));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -648,7 +648,7 @@ public class WebsocketApiTest extends AbstractControllerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEntityCountCmd_filterTypeSingularCompatibilityTest() {
|
public void testEntityCountCmd_filterTypeSingularCompatibilityTest() throws Exception {
|
||||||
ObjectNode oldFormatDeviceTypeFilterSingular = JacksonUtil.newObjectNode();
|
ObjectNode oldFormatDeviceTypeFilterSingular = JacksonUtil.newObjectNode();
|
||||||
oldFormatDeviceTypeFilterSingular.put("type", "deviceType");
|
oldFormatDeviceTypeFilterSingular.put("type", "deviceType");
|
||||||
oldFormatDeviceTypeFilterSingular.put("deviceType", "default");
|
oldFormatDeviceTypeFilterSingular.put("deviceType", "default");
|
||||||
@ -667,9 +667,10 @@ public class WebsocketApiTest extends AbstractControllerTest {
|
|||||||
ObjectNode wrapperNode = JacksonUtil.newObjectNode();
|
ObjectNode wrapperNode = JacksonUtil.newObjectNode();
|
||||||
wrapperNode.set("entityCountCmds", entityCountCmds);
|
wrapperNode.set("entityCountCmds", entityCountCmds);
|
||||||
|
|
||||||
getWsClient().send(JacksonUtil.toString(wrapperNode));
|
wsClient = buildAndConnectWebSocketClient("/api/ws/plugins/telemetry");
|
||||||
|
wsClient.send(JacksonUtil.toString(wrapperNode));
|
||||||
|
|
||||||
EntityCountUpdate update = getWsClient().parseCountReply(getWsClient().waitForReply());
|
EntityCountUpdate update = wsClient.parseCountReply(wsClient.waitForReply());
|
||||||
Assert.assertEquals(1, update.getCmdId());
|
Assert.assertEquals(1, update.getCmdId());
|
||||||
Assert.assertEquals(1, update.getCount());
|
Assert.assertEquals(1, update.getCount());
|
||||||
|
|
||||||
|
|||||||
@ -29,7 +29,6 @@ import org.thingsboard.server.service.ws.notification.cmd.NotificationsCountSubC
|
|||||||
import org.thingsboard.server.service.ws.notification.cmd.NotificationsSubCmd;
|
import org.thingsboard.server.service.ws.notification.cmd.NotificationsSubCmd;
|
||||||
import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsCountUpdate;
|
import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsCountUpdate;
|
||||||
import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsUpdate;
|
import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsUpdate;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper;
|
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdateType;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdateType;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
@ -56,49 +55,34 @@ public class NotificationApiWsClient extends TbTestWebSocketClient {
|
|||||||
private final Map<Integer, UnreadNotificationsUpdate> lastUpdates = new ConcurrentHashMap<>();
|
private final Map<Integer, UnreadNotificationsUpdate> lastUpdates = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public NotificationApiWsClient(String wsUrl, String token) throws URISyntaxException {
|
public NotificationApiWsClient(String wsUrl, String token) throws URISyntaxException {
|
||||||
super(new URI(wsUrl + "/api/ws/plugins/telemetry?token=" + token));
|
super(new URI(wsUrl + "/api/ws?token=" + token));
|
||||||
}
|
}
|
||||||
|
|
||||||
public NotificationApiWsClient subscribeForUnreadNotifications(int limit, NotificationType... types) {
|
public NotificationApiWsClient subscribeForUnreadNotifications(int limit, NotificationType... types) {
|
||||||
WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper();
|
send(new NotificationsSubCmd(newCmdId(), limit, Arrays.stream(types).collect(Collectors.toSet())));
|
||||||
cmdsWrapper.setUnreadNotificationsSubCmd(new NotificationsSubCmd(newCmdId(), limit, Arrays.stream(types).collect(Collectors.toSet())));
|
|
||||||
sendCmd(cmdsWrapper);
|
|
||||||
this.limit = limit;
|
this.limit = limit;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int subscribeForUnreadNotificationsAndWait(int limit, NotificationType... types) {
|
public int subscribeForUnreadNotificationsAndWait(int limit, NotificationType... types) {
|
||||||
WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper();
|
|
||||||
int subId = newCmdId();
|
int subId = newCmdId();
|
||||||
cmdsWrapper.setUnreadNotificationsSubCmd(new NotificationsSubCmd(subId, limit, Arrays.stream(types).collect(Collectors.toSet())));
|
send(new NotificationsSubCmd(subId, limit, Arrays.stream(types).collect(Collectors.toSet())));
|
||||||
sendCmd(cmdsWrapper);
|
|
||||||
waitForReply();
|
waitForReply();
|
||||||
this.limit = limit;
|
this.limit = limit;
|
||||||
return subId;
|
return subId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public NotificationApiWsClient subscribeForUnreadNotificationsCount() {
|
public NotificationApiWsClient subscribeForUnreadNotificationsCount() {
|
||||||
WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper();
|
send(new NotificationsCountSubCmd(newCmdId()));
|
||||||
cmdsWrapper.setUnreadNotificationsCountSubCmd(new NotificationsCountSubCmd(newCmdId()));
|
|
||||||
sendCmd(cmdsWrapper);
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void markNotificationAsRead(UUID... notifications) {
|
public void markNotificationAsRead(UUID... notifications) {
|
||||||
WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper();
|
send(new MarkNotificationsAsReadCmd(newCmdId(), Arrays.asList(notifications)));
|
||||||
cmdsWrapper.setMarkNotificationAsReadCmd(new MarkNotificationsAsReadCmd(newCmdId(), Arrays.asList(notifications)));
|
|
||||||
sendCmd(cmdsWrapper);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void markAllNotificationsAsRead() {
|
public void markAllNotificationsAsRead() {
|
||||||
WsCmdsWrapper cmdsWrapper = new WsCmdsWrapper();
|
send(new MarkAllNotificationsAsReadCmd(newCmdId()));
|
||||||
cmdsWrapper.setMarkAllNotificationsAsReadCmd(new MarkAllNotificationsAsReadCmd(newCmdId()));
|
|
||||||
sendCmd(cmdsWrapper);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void sendCmd(WsCmdsWrapper cmdsWrapper) {
|
|
||||||
String cmd = JacksonUtil.toString(cmdsWrapper);
|
|
||||||
send(cmd);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -63,7 +63,7 @@ import org.thingsboard.server.common.data.query.SingleEntityFilter;
|
|||||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||||
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
||||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.WsCmdsWrapper;
|
import org.thingsboard.server.service.ws.telemetry.cmd.TelemetryCmdsWrapper;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd;
|
import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd;
|
||||||
@ -229,10 +229,7 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
|
|||||||
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
|
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
|
||||||
|
|
||||||
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
|
EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
|
||||||
WsCmdsWrapper wrapper = new WsCmdsWrapper();
|
getWsClient().send(cmd);
|
||||||
wrapper.setEntityDataCmds(Collections.singletonList(cmd));
|
|
||||||
|
|
||||||
getWsClient().send(JacksonUtil.toString(wrapper));
|
|
||||||
getWsClient().waitForReply();
|
getWsClient().waitForReply();
|
||||||
|
|
||||||
getWsClient().registerWaitForUpdate();
|
getWsClient().registerWaitForUpdate();
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user