From 8a12c63400b0433ca2f30ea68afd1f14f9a12f30 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 9 Jan 2023 13:08:15 +0200 Subject: [PATCH] Add more logging; fix race condition in WS client --- monitoring/src/main/conf/logback.xml | 8 ++-- .../monitoring/client/WsClient.java | 48 +++++++++++++++---- .../monitoring/data/MonitoredServiceKey.java | 1 - .../service/TransportMonitoringService.java | 5 ++ .../impl/CoapTransportMonitoringService.java | 4 ++ .../impl/HttpTransportMonitoringService.java | 3 ++ .../impl/MqttTransportMonitoringService.java | 4 ++ 7 files changed, 58 insertions(+), 15 deletions(-) diff --git a/monitoring/src/main/conf/logback.xml b/monitoring/src/main/conf/logback.xml index ea90a898cd..5d37a17110 100644 --- a/monitoring/src/main/conf/logback.xml +++ b/monitoring/src/main/conf/logback.xml @@ -17,7 +17,7 @@ --> - + @@ -41,10 +41,8 @@ - - - - + + diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java index 1c33da0338..56d29de5fa 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java @@ -33,14 +33,18 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j public class WsClient extends WebSocketClient { - private volatile String lastMsg; + public volatile String lastMsg; private CountDownLatch reply; private CountDownLatch update; + private final Lock updateLock = new ReentrantLock(); + public WsClient(URI serverUri) { super(serverUri); } @@ -52,12 +56,21 @@ public class WsClient extends WebSocketClient { @Override public void onMessage(String s) { - lastMsg = s; - if (reply != null) { - reply.countDown(); + log.trace("Received new msg: {}", s); + if (s == null) { + return; } - if (update != null) { - update.countDown(); + updateLock.lock(); + try { + lastMsg = s; + if (update != null) { + update.countDown(); + } + if (reply != null) { + reply.countDown(); + } + } finally { + updateLock.unlock(); } } @@ -72,13 +85,24 @@ public class WsClient extends WebSocketClient { } public void registerWaitForUpdate() { - lastMsg = null; - update = new CountDownLatch(1); + updateLock.lock(); + try { + lastMsg = null; + update = new CountDownLatch(1); + } finally { + updateLock.unlock(); + } + log.trace("Registered wait for update"); } @Override public void send(String text) throws NotYetConnectedException { - reply = new CountDownLatch(1); + updateLock.lock(); + try { + reply = new CountDownLatch(1); + } finally { + updateLock.unlock(); + } super.send(text); } @@ -93,27 +117,33 @@ public class WsClient extends WebSocketClient { CmdsWrapper wrapper = new CmdsWrapper(); wrapper.setTsSubCmds(List.of(subCmd)); send(JacksonUtil.toString(wrapper)); + log.trace("Subscribed for telemetry (key: {})", telemetryKey); } public JsonNode waitForUpdate(long ms) { + log.trace("update latch count: {}", update.getCount()); try { if (update.await(ms, TimeUnit.MILLISECONDS)) { + log.trace("Waited for update"); return getLastMsg(); } } catch (InterruptedException e) { log.debug("Failed to await reply", e); } + log.trace("No update arrived within {} ms", ms); return null; } public JsonNode waitForReply(int ms) { try { if (reply.await(ms, TimeUnit.MILLISECONDS)) { + log.trace("Waited for reply"); return getLastMsg(); } } catch (InterruptedException e) { log.debug("Failed to await reply", e); } + log.trace("No reply arrived within {} ms", ms); return null; } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java index f2227978ae..57609d609a 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java @@ -18,6 +18,5 @@ package org.thingsboard.monitoring.data; public class MonitoredServiceKey { public static final String GENERAL = "Monitoring"; - public static final String WEB_UI = "Web UI"; } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/TransportMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/TransportMonitoringService.java index e27002449e..6cddf05469 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/TransportMonitoringService.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/TransportMonitoringService.java @@ -81,6 +81,7 @@ public abstract class TransportMonitoringService { WsClient wsClient = null; try { + log.trace("[{}] Checking", transportInfo); wsClient = establishWsClient(); wsClient.registerWaitForUpdate(); @@ -88,13 +89,16 @@ public abstract class TransportMonitoringService { private CoapClient coapClient; @@ -46,6 +48,7 @@ public class CoapTransportMonitoringService extends TransportMonitoringService { private RestTemplate restTemplate; @@ -44,6 +46,7 @@ public class HttpTransportMonitoringService extends TransportMonitoringService { private MqttClient mqttClient; @@ -56,6 +58,7 @@ public class MqttTransportMonitoringService extends TransportMonitoringService