Add more trace logging

This commit is contained in:
ViacheslavKlimov 2023-01-11 14:30:10 +02:00
parent 8a12c63400
commit 8c585fbd14
2 changed files with 15 additions and 11 deletions

View File

@ -35,11 +35,13 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityIdFactory;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
@ -69,14 +71,15 @@ public class MonitoringReporter {
@EventListener(ApplicationReadyEvent.class) @EventListener(ApplicationReadyEvent.class)
public void startLatenciesMonitoring() { public void startLatenciesMonitoring() {
monitoringExecutor.scheduleWithFixedDelay(() -> { monitoringExecutor.scheduleWithFixedDelay(() -> {
List<Latency> latencies = this.latencies.values().stream()
.filter(Latency::isNotEmpty)
.collect(Collectors.toList());
if (latencies.isEmpty()) { if (latencies.isEmpty()) {
return; return;
} }
log.info("Latencies:\n{}", latencies.values()); log.info("Latencies:\n{}", latencies);
if (latencies.values().stream() if (latencies.stream().anyMatch(latency -> latency.getAvg() >= (double) latencyThresholdMs)) {
.filter(Latency::isNotEmpty) HighLatencyNotification highLatencyNotification = new HighLatencyNotification(latencies, latencyThresholdMs);
.anyMatch(latency -> latency.getAvg() >= (double) latencyThresholdMs)) {
HighLatencyNotification highLatencyNotification = new HighLatencyNotification(latencies.values(), latencyThresholdMs);
notificationService.sendNotification(highLatencyNotification); notificationService.sendNotification(highLatencyNotification);
} }
@ -90,11 +93,9 @@ public class MonitoringReporter {
} }
tbClient.logIn(); tbClient.logIn();
ObjectNode msg = JacksonUtil.newObjectNode(); ObjectNode msg = JacksonUtil.newObjectNode();
latencies.forEach((key, latency) -> { latencies.forEach(latency -> {
if (latency.isNotEmpty()) { msg.set(latency.getKey(), new DoubleNode(latency.getAvg()));
msg.set(key, new DoubleNode(latency.getAvg()));
latency.reset(); latency.reset();
}
}); });
tbClient.saveEntityTelemetry(entityId, "time", msg); tbClient.saveEntityTelemetry(entityId, "time", msg);
} catch (Exception e) { } catch (Exception e) {
@ -107,7 +108,8 @@ public class MonitoringReporter {
public void reportLatency(String key, long latencyInNanos) { public void reportLatency(String key, long latencyInNanos) {
String latencyKey = key + "Latency"; String latencyKey = key + "Latency";
double latencyInMs = (double) latencyInNanos / 1000_000; double latencyInMs = (double) latencyInNanos / 1000_000;
latencies.computeIfAbsent(key, k -> new Latency(latencyKey)).report(latencyInMs); log.trace("Reporting latency [{}]: {} ms", key, latencyInMs);
latencies.computeIfAbsent(latencyKey, k -> new Latency(latencyKey)).report(latencyInMs);
} }
public void serviceFailure(Object serviceKey, Exception error) { public void serviceFailure(Object serviceKey, Exception error) {

View File

@ -133,9 +133,11 @@ public abstract class TransportMonitoringService<C extends TransportMonitoringSe
private WsClient establishWsClient() throws Exception { private WsClient establishWsClient() throws Exception {
stopWatch.start(); stopWatch.start();
String accessToken = tbClient.logIn(); String accessToken = tbClient.logIn();
log.trace("[{}] Received new access token", transportInfo);
monitoringReporter.reportLatency(Latencies.LOG_IN, stopWatch.getTime()); monitoringReporter.reportLatency(Latencies.LOG_IN, stopWatch.getTime());
WsClient wsClient = wsClientFactory.createClient(accessToken); WsClient wsClient = wsClientFactory.createClient(accessToken);
log.trace("[{}] Created WS client", transportInfo);
wsClient.subscribeForTelemetry(target.getDevice().getId(), TEST_TELEMETRY_KEY); wsClient.subscribeForTelemetry(target.getDevice().getId(), TEST_TELEMETRY_KEY);
Optional.ofNullable(wsClient.waitForReply(wsConfig.getRequestTimeoutMs())) Optional.ofNullable(wsClient.waitForReply(wsConfig.getRequestTimeoutMs()))
.orElseThrow(() -> new IllegalStateException("Failed to subscribe for telemetry")); .orElseThrow(() -> new IllegalStateException("Failed to subscribe for telemetry"));