diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index 11e4078c97..200a6c71d2 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -85,13 +85,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke @Value("${server.ws.ping_timeout:30000}") private long pingTimeout; -<<<<<<< HEAD - private ConcurrentMap blacklistedSessions = new ConcurrentHashMap<>(); - private ConcurrentMap perSessionUpdateLimits = new ConcurrentHashMap<>(); -======= - private final ConcurrentMap blacklistedSessions = new ConcurrentHashMap<>(); + private final ConcurrentMap blacklistedSessions = new ConcurrentHashMap<>(); private final ConcurrentMap perSessionUpdateLimits = new ConcurrentHashMap<>(); ->>>>>>> upstream/develop/3.5 private final ConcurrentMap> tenantSessionsMap = new ConcurrentHashMap<>(); private final ConcurrentMap> customerSessionsMap = new ConcurrentHashMap<>(); diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 75b333d35b..f4dc689f35 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -218,7 +218,6 @@ public class DefaultWebSocketService implements WebSocketService { } try { -<<<<<<< HEAD:application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java switch (sessionRef.getSessionType()) { case TELEMETRY: processTelemetryCmds(sessionRef, msg); @@ -226,45 +225,6 @@ public class DefaultWebSocketService implements WebSocketService { case NOTIFICATIONS: processNotificationCmds(sessionRef, msg); break; -======= - TelemetryPluginCmdsWrapper cmdsWrapper = JacksonUtil.OBJECT_MAPPER.readValue(msg, TelemetryPluginCmdsWrapper.class); - if (cmdsWrapper != null) { - if (cmdsWrapper.getAttrSubCmds() != null) { - cmdsWrapper.getAttrSubCmds().forEach(cmd -> { - if (processSubscription(sessionRef, cmd)) { - handleWsAttributesSubscriptionCmd(sessionRef, cmd); - } - }); - } - if (cmdsWrapper.getTsSubCmds() != null) { - cmdsWrapper.getTsSubCmds().forEach(cmd -> { - if (processSubscription(sessionRef, cmd)) { - handleWsTimeseriesSubscriptionCmd(sessionRef, cmd); - } - }); - } - if (cmdsWrapper.getHistoryCmds() != null) { - cmdsWrapper.getHistoryCmds().forEach(cmd -> handleWsHistoryCmd(sessionRef, cmd)); - } - if (cmdsWrapper.getEntityDataCmds() != null) { - cmdsWrapper.getEntityDataCmds().forEach(cmd -> handleWsEntityDataCmd(sessionRef, cmd)); - } - if (cmdsWrapper.getAlarmDataCmds() != null) { - cmdsWrapper.getAlarmDataCmds().forEach(cmd -> handleWsAlarmDataCmd(sessionRef, cmd)); - } - if (cmdsWrapper.getEntityCountCmds() != null) { - cmdsWrapper.getEntityCountCmds().forEach(cmd -> handleWsEntityCountCmd(sessionRef, cmd)); - } - if (cmdsWrapper.getEntityDataUnsubscribeCmds() != null) { - cmdsWrapper.getEntityDataUnsubscribeCmds().forEach(cmd -> handleWsDataUnsubscribeCmd(sessionRef, cmd)); - } - if (cmdsWrapper.getAlarmDataUnsubscribeCmds() != null) { - cmdsWrapper.getAlarmDataUnsubscribeCmds().forEach(cmd -> handleWsDataUnsubscribeCmd(sessionRef, cmd)); - } - if (cmdsWrapper.getEntityCountUnsubscribeCmds() != null) { - cmdsWrapper.getEntityCountUnsubscribeCmds().forEach(cmd -> handleWsDataUnsubscribeCmd(sessionRef, cmd)); - } ->>>>>>> upstream/develop/3.5:application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java } } catch (IOException e) { log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e); @@ -272,9 +232,8 @@ public class DefaultWebSocketService implements WebSocketService { } } - private void processTelemetryCmds(WebSocketSessionRef sessionRef, String msg) throws JsonProcessingException { - TelemetryPluginCmdsWrapper cmdsWrapper = jsonMapper.readValue(msg, TelemetryPluginCmdsWrapper.class); + TelemetryPluginCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, TelemetryPluginCmdsWrapper.class); if (cmdsWrapper == null) { return; } @@ -287,7 +246,7 @@ public class DefaultWebSocketService implements WebSocketService { } private void processNotificationCmds(WebSocketSessionRef sessionRef, String msg) throws IOException { - NotificationCmdsWrapper cmdsWrapper = jsonMapper.readValue(msg, NotificationCmdsWrapper.class); + NotificationCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, NotificationCmdsWrapper.class); for (WsCmdHandler cmdHandler : notificationCmdsHandlers) { WsCmd cmd = cmdHandler.extractCmd(cmdsWrapper); if (cmd != null) { @@ -526,18 +485,14 @@ public class DefaultWebSocketService implements WebSocketService { .allKeys(false) .keyStates(subState) .scope(scope) -<<<<<<< HEAD:application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java - .updateProcessor((subscription, update) -> sendWsMsg(subscription.getSessionId(), update)) -======= - .updateConsumer((sessionId, update) -> { + .updateProcessor((subscription, update) -> { subLock.lock(); try { - sendWsMsg(sessionId, update); + sendWsMsg(subscription.getSessionId(), update); } finally { subLock.unlock(); } }) ->>>>>>> upstream/develop/3.5:application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java .build(); subLock.lock(); @@ -643,15 +598,10 @@ public class DefaultWebSocketService implements WebSocketService { .entityId(entityId) .allKeys(true) .keyStates(subState) -<<<<<<< HEAD:application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java - .updateProcessor((subscription, update) -> sendWsMsg(subscription.getSessionId(), update)) - .scope(scope).build(); - oldSubService.addSubscription(sub); -======= - .updateConsumer((sessionId, update) -> { + .updateProcessor((subscription, update) -> { subLock.lock(); try { - sendWsMsg(sessionId, update); + sendWsMsg(subscription.getSessionId(), update); } finally { subLock.unlock(); } @@ -666,7 +616,6 @@ public class DefaultWebSocketService implements WebSocketService { } finally { subLock.unlock(); } ->>>>>>> upstream/develop/3.5:application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java } @Override @@ -749,20 +698,17 @@ public class DefaultWebSocketService implements WebSocketService { .subscriptionId(cmd.getCmdId()) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityId) -<<<<<<< HEAD:application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java - .updateProcessor((subscription, update) -> sendWsMsg(subscription.getSessionId(), update)) -======= - .updateConsumer((sessionId, update) -> { + .updateProcessor((subscription, update) -> { subLock.lock(); try { - sendWsMsg(sessionId, update); + sendWsMsg(subscription.getSessionId(), update); } finally { subLock.unlock(); } }) ->>>>>>> upstream/develop/3.5:application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java .allKeys(true) - .keyStates(subState).build(); + .keyStates(subState) + .build(); subLock.lock(); try { @@ -805,20 +751,17 @@ public class DefaultWebSocketService implements WebSocketService { .subscriptionId(cmd.getCmdId()) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityId) -<<<<<<< HEAD:application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java - .updateProcessor((subscription, update) -> sendWsMsg(subscription.getSessionId(), update)) -======= - .updateConsumer((sessionId, update) -> { + .updateProcessor((subscription, update) -> { subLock.lock(); try { - sendWsMsg(sessionId, update); + sendWsMsg(subscription.getSessionId(), update); } finally { subLock.unlock(); } }) ->>>>>>> upstream/develop/3.5:application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java .allKeys(false) - .keyStates(subState).build(); + .keyStates(subState) + .build(); subLock.lock(); try{ diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java index 53de755bc3..5809ab5e42 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java @@ -52,12 +52,8 @@ public abstract class AbstractControllerTest extends AbstractNotifyEntityTest { @LocalServerPort protected int wsPort; -<<<<<<< HEAD - private TbTestWebSocketClient wsClient; // lazy - private TbTestWebSocketClient anotherWsClient; // lazy -======= private volatile TbTestWebSocketClient wsClient; // lazy ->>>>>>> upstream/develop/3.5 + private volatile TbTestWebSocketClient anotherWsClient; // lazy public TbTestWebSocketClient getWsClient() { if (wsClient == null) { diff --git a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java index 95370b0ecf..2366a631fa 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java +++ b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java @@ -48,11 +48,9 @@ import java.util.concurrent.TimeUnit; @Slf4j public class TbTestWebSocketClient extends WebSocketClient { -<<<<<<< HEAD - @Getter -======= private static final long TIMEOUT = TimeUnit.SECONDS.toMillis(30); ->>>>>>> upstream/develop/3.5 + + @Getter private volatile String lastMsg; private volatile CountDownLatch reply; private volatile CountDownLatch update; @@ -100,11 +98,7 @@ public class TbTestWebSocketClient extends WebSocketClient { @Override public void send(String text) throws NotYetConnectedException { -<<<<<<< HEAD - log.info("SENDING: {}", text); -======= log.debug("send [{}]", text); ->>>>>>> upstream/develop/3.5 reply = new CountDownLatch(1); super.send(text); } @@ -122,12 +116,11 @@ public class TbTestWebSocketClient extends WebSocketClient { } public String waitForUpdate() { -<<<<<<< HEAD return waitForUpdate(false); } public String waitForUpdate(boolean throwExceptionOnTimeout) { - return waitForUpdate(TimeUnit.SECONDS.toMillis(3), throwExceptionOnTimeout); + return waitForUpdate(TIMEOUT, throwExceptionOnTimeout); } public String waitForUpdate(long ms) { @@ -135,19 +128,12 @@ public class TbTestWebSocketClient extends WebSocketClient { } public String waitForUpdate(long ms, boolean throwExceptionOnTimeout) { + log.debug("waitForUpdate [{}]", ms); try { if (update.await(ms, TimeUnit.MILLISECONDS)) { return lastMsg; -======= - return waitForUpdate(TIMEOUT); - } - - public String waitForUpdate(long ms) { - log.debug("waitForUpdate [{}]", ms); - try { - if (!update.await(ms, TimeUnit.MILLISECONDS)) { + } else { log.warn("Failed to await update (waiting time [{}]ms elapsed)", ms, new RuntimeException("stacktrace")); ->>>>>>> upstream/develop/3.5 } } catch (InterruptedException e) { log.warn("Failed to await update", e); @@ -160,30 +146,26 @@ public class TbTestWebSocketClient extends WebSocketClient { } public String waitForReply() { -<<<<<<< HEAD return waitForReply(false); } public String waitForReply(boolean throwExceptionOnTimeout) { - try { - if (reply.await(3, TimeUnit.SECONDS)) { - return lastMsg; -======= - return waitForReply(TIMEOUT); + return waitForReply(TIMEOUT, throwExceptionOnTimeout); } - public String waitForReply(long ms) { + public String waitForReply(long ms, boolean throwExceptionOnTimeout) { log.debug("waitForReply [{}]", ms); try { - if (!reply.await(ms, TimeUnit.MILLISECONDS)) { + if (reply.await(ms, TimeUnit.MILLISECONDS)) { + return lastMsg; + } else { log.warn("Failed to await reply (waiting time [{}]ms elapsed)", ms, new RuntimeException("stacktrace")); ->>>>>>> upstream/develop/3.5 } } catch (InterruptedException e) { log.warn("Failed to await reply", e); } if (throwExceptionOnTimeout) { - throw new AssertionError("Waited for reply for 3 seconds but none arrived"); + throw new AssertionError("Waited for reply for " + ms + " ms but none arrived"); } else { return null; }