Add more logging; fix race condition in WS client

This commit is contained in:
ViacheslavKlimov 2023-01-09 13:08:15 +02:00
parent 1ffc8107d5
commit 8a12c63400
7 changed files with 58 additions and 15 deletions

View File

@ -17,7 +17,7 @@
-->
<!DOCTYPE configuration>
<configuration>
<configuration scan="true" scanPeriod="30 seconds">
<appender name="fileLogAppender"
class="ch.qos.logback.core.rolling.RollingFileAppender">
@ -41,10 +41,8 @@
<logger name="org" level="WARN"/>
<logger name="org.thingsboard.server" level="INFO"/>
<logger name="org.thingsboard.monitoring" level="TRACE"/>
<logger name="org.thingsboard.monitoring.client" level="WARN"/>
<logger name="org.openqa.selenium" level="WARN"/>
<logger name="io.github.bonigarcia" level="WARN"/>
<logger name="org.thingsboard.monitoring" level="DEBUG"/>
<logger name="org.thingsboard.monitoring.client" level="INFO"/>
<root level="INFO">
<appender-ref ref="fileLogAppender"/>

View File

@ -33,14 +33,18 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class WsClient extends WebSocketClient {
private volatile String lastMsg;
public volatile String lastMsg;
private CountDownLatch reply;
private CountDownLatch update;
private final Lock updateLock = new ReentrantLock();
public WsClient(URI serverUri) {
super(serverUri);
}
@ -52,12 +56,21 @@ public class WsClient extends WebSocketClient {
@Override
public void onMessage(String s) {
lastMsg = s;
if (reply != null) {
reply.countDown();
log.trace("Received new msg: {}", s);
if (s == null) {
return;
}
if (update != null) {
update.countDown();
updateLock.lock();
try {
lastMsg = s;
if (update != null) {
update.countDown();
}
if (reply != null) {
reply.countDown();
}
} finally {
updateLock.unlock();
}
}
@ -72,13 +85,24 @@ public class WsClient extends WebSocketClient {
}
public void registerWaitForUpdate() {
lastMsg = null;
update = new CountDownLatch(1);
updateLock.lock();
try {
lastMsg = null;
update = new CountDownLatch(1);
} finally {
updateLock.unlock();
}
log.trace("Registered wait for update");
}
@Override
public void send(String text) throws NotYetConnectedException {
reply = new CountDownLatch(1);
updateLock.lock();
try {
reply = new CountDownLatch(1);
} finally {
updateLock.unlock();
}
super.send(text);
}
@ -93,27 +117,33 @@ public class WsClient extends WebSocketClient {
CmdsWrapper wrapper = new CmdsWrapper();
wrapper.setTsSubCmds(List.of(subCmd));
send(JacksonUtil.toString(wrapper));
log.trace("Subscribed for telemetry (key: {})", telemetryKey);
}
public JsonNode waitForUpdate(long ms) {
log.trace("update latch count: {}", update.getCount());
try {
if (update.await(ms, TimeUnit.MILLISECONDS)) {
log.trace("Waited for update");
return getLastMsg();
}
} catch (InterruptedException e) {
log.debug("Failed to await reply", e);
}
log.trace("No update arrived within {} ms", ms);
return null;
}
public JsonNode waitForReply(int ms) {
try {
if (reply.await(ms, TimeUnit.MILLISECONDS)) {
log.trace("Waited for reply");
return getLastMsg();
}
} catch (InterruptedException e) {
log.debug("Failed to await reply", e);
}
log.trace("No reply arrived within {} ms", ms);
return null;
}

View File

@ -18,6 +18,5 @@ package org.thingsboard.monitoring.data;
public class MonitoredServiceKey {
public static final String GENERAL = "Monitoring";
public static final String WEB_UI = "Web UI";
}

View File

@ -81,6 +81,7 @@ public abstract class TransportMonitoringService<C extends TransportMonitoringSe
monitoringExecutor.scheduleWithFixedDelay(() -> {
WsClient wsClient = null;
try {
log.trace("[{}] Checking", transportInfo);
wsClient = establishWsClient();
wsClient.registerWaitForUpdate();
@ -88,13 +89,16 @@ public abstract class TransportMonitoringService<C extends TransportMonitoringSe
String testPayload = JacksonUtil.newObjectNode().set(TEST_TELEMETRY_KEY, new TextNode(testValue)).toString();
try {
initClientAndSendPayload(testPayload);
log.trace("[{}] Sent test payload ({})", transportInfo, testPayload);
} catch (Throwable e) {
throw new TransportFailureException(e);
}
log.trace("[{}] Waiting for WS update", transportInfo);
checkWsUpdate(wsClient, testValue);
monitoringReporter.serviceIsOk(transportInfo);
monitoringReporter.serviceIsOk(MonitoredServiceKey.GENERAL);
} catch (TransportFailureException transportFailureException) {
monitoringReporter.serviceFailure(transportInfo, transportFailureException);
} catch (Exception e) {
@ -141,6 +145,7 @@ public abstract class TransportMonitoringService<C extends TransportMonitoringSe
private void checkWsUpdate(WsClient wsClient, String testValue) {
stopWatch.start();
wsClient.waitForUpdate(wsConfig.getResultCheckTimeoutMs());
log.trace("[{}] Waited for WS update. Last WS msg: {}", transportInfo, wsClient.lastMsg);
Object update = wsClient.getTelemetryKeyUpdate(TEST_TELEMETRY_KEY);
if (update == null) {
throw new TransportFailureException("No WS update arrived within " + wsConfig.getResultCheckTimeoutMs() + " ms");

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.monitoring.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapResponse;
import org.eclipse.californium.core.coap.CoAP;
@ -31,6 +32,7 @@ import java.io.IOException;
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class CoapTransportMonitoringService extends TransportMonitoringService<CoapTransportMonitoringServiceConfig> {
private CoapClient coapClient;
@ -46,6 +48,7 @@ public class CoapTransportMonitoringService extends TransportMonitoringService<C
String uri = target.getBaseUrl() + "/api/v1/" + accessToken + "/telemetry";
coapClient = new CoapClient(uri);
coapClient.setTimeout((long) config.getRequestTimeoutMs());
log.debug("Initialized CoAP client for URI {}", uri);
}
}
@ -63,6 +66,7 @@ public class CoapTransportMonitoringService extends TransportMonitoringService<C
if (coapClient != null) {
coapClient.shutdown();
coapClient = null;
log.info("Disconnected CoAP client");
}
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.monitoring.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Scope;
@ -29,6 +30,7 @@ import java.time.Duration;
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class HttpTransportMonitoringService extends TransportMonitoringService<HttpTransportMonitoringServiceConfig> {
private RestTemplate restTemplate;
@ -44,6 +46,7 @@ public class HttpTransportMonitoringService extends TransportMonitoringService<H
.setConnectTimeout(Duration.ofMillis(config.getRequestTimeoutMs()))
.setReadTimeout(Duration.ofMillis(config.getRequestTimeoutMs()))
.build();
log.debug("Initialized HTTP client");
}
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.monitoring.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
@ -31,6 +32,7 @@ import org.thingsboard.monitoring.service.TransportMonitoringService;
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class MqttTransportMonitoringService extends TransportMonitoringService<MqttTransportMonitoringServiceConfig> {
private MqttClient mqttClient;
@ -56,6 +58,7 @@ public class MqttTransportMonitoringService extends TransportMonitoringService<M
if (result.getException() != null) {
throw result.getException();
}
log.debug("Initialized MQTT client for URI {}", mqttClient.getServerURI());
}
}
@ -72,6 +75,7 @@ public class MqttTransportMonitoringService extends TransportMonitoringService<M
if (mqttClient != null) {
mqttClient.disconnect();
mqttClient = null;
log.info("Disconnected MQTT client");
}
}