From 1b9dfa17c25c188d67a1b86076c765da5538bed9 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 9 Oct 2023 18:24:43 +0300 Subject: [PATCH] Refactoring for latencies tracking --- .../monitoring/data/Latencies.java | 4 ++ .../thingsboard/monitoring/data/Latency.java | 48 ++----------------- .../notification/HighLatencyNotification.java | 10 ++-- .../monitoring/service/BaseHealthChecker.java | 2 +- .../service/BaseMonitoringService.java | 5 +- .../service/MonitoringReporter.java | 26 +++++----- 6 files changed, 30 insertions(+), 65 deletions(-) diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java index 3370d42462..52e73a64cd 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java @@ -25,4 +25,8 @@ public class Latencies { return String.format("%sRequest", key); } + public static String wsUpdate(String key) { + return String.format("%sWsUpdate", key); + } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java index c64291e30f..4b17bf179f 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java @@ -15,53 +15,15 @@ */ package org.thingsboard.monitoring.data; -import com.google.common.util.concurrent.AtomicDouble; -import lombok.RequiredArgsConstructor; +import lombok.Data; -import java.util.concurrent.atomic.AtomicInteger; - -@RequiredArgsConstructor +@Data(staticConstructor = "of") public class Latency { - private final String key; - private final AtomicDouble latencySum = new AtomicDouble(); - private final AtomicInteger counter = new AtomicInteger(); + private final double value; - public synchronized void report(double latencyInMs) { - latencySum.addAndGet(latencyInMs); - counter.incrementAndGet(); - } - - public synchronized double getAvg() { - return latencySum.get() / counter.get(); - } - - public boolean isNotEmpty() { - return counter.get() > 0; - } - - public synchronized void reset() { - latencySum.set(0.0); - counter.set(0); - } - - public String getKey() { - return key; - } - - public synchronized Latency snapshot() { - Latency snapshot = new Latency(key); - snapshot.latencySum.set(latencySum.get()); - snapshot.counter.set(counter.get()); - return snapshot; - } - - @Override - public String toString() { - return "Latency{" + - "key='" + key + '\'' + - ", avgLatency=" + getAvg() + - '}'; + public String getFormattedValue() { + return String.format("%,.2f ms", value); } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java index 939cb7440f..5744e40d41 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java @@ -21,11 +21,11 @@ import java.util.Collection; public class HighLatencyNotification implements Notification { - private final Collection latencies; + private final Collection highLatencies; private final int thresholdMs; - public HighLatencyNotification(Collection latencies, int thresholdMs) { - this.latencies = latencies; + public HighLatencyNotification(Collection highLatencies, int thresholdMs) { + this.highLatencies = highLatencies; this.thresholdMs = thresholdMs; } @@ -33,8 +33,8 @@ public class HighLatencyNotification implements Notification { public String getText() { StringBuilder text = new StringBuilder(); text.append("Some of the latencies are higher than ").append(thresholdMs).append(" ms:\n"); - latencies.forEach(latency -> { - text.append(String.format("[%s] %,.2f ms\n", latency.getKey(), latency.getAvg())); + highLatencies.forEach(latency -> { + text.append(String.format("[%s] %s\n", latency.getKey(), latency.getFormattedValue())); }); return text.toString(); } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java index affca1ce09..697573289d 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java @@ -96,7 +96,7 @@ public abstract class BaseHealthChecker, T extends MonitoringTarget> { - @Autowired + @Autowired(required = false) private List configs; private final List> healthCheckers = new LinkedList<>(); private final List devices = new LinkedList<>(); @@ -54,6 +54,9 @@ public abstract class BaseMonitoringService, T ext @PostConstruct private void init() { + if (configs == null || configs.isEmpty()) { + return; + } tbClient.logIn(); configs.forEach(config -> { config.getTargets().forEach(target -> { diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java index f41454a76a..aabf04caee 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java @@ -63,25 +63,20 @@ public class MonitoringReporter { private String reportingAssetId; public void reportLatencies(TbClient tbClient) { - List latencies = this.latencies.values().stream() - .filter(Latency::isNotEmpty) - .map(latency -> { - Latency snapshot = latency.snapshot(); - latency.reset(); - return snapshot; - }) - .collect(Collectors.toList()); if (latencies.isEmpty()) { return; } - log.info("Latencies:\n{}", latencies.stream().map(latency -> latency.getKey() + ": " + latency.getAvg() + " ms") + log.debug("Latencies:\n{}", latencies.values().stream().map(latency -> latency.getKey() + ": " + latency.getFormattedValue()) .collect(Collectors.joining("\n")) + "\n"); - if (!latencyReportingEnabled) return; - if (latencies.stream().anyMatch(latency -> latency.getAvg() >= (double) latencyThresholdMs)) { - HighLatencyNotification highLatencyNotification = new HighLatencyNotification(latencies, latencyThresholdMs); + List highLatencies = latencies.values().stream() + .filter(latency -> latency.getValue() >= (double) latencyThresholdMs) + .collect(Collectors.toList()); + if (!highLatencies.isEmpty()) { + HighLatencyNotification highLatencyNotification = new HighLatencyNotification(highLatencies, latencyThresholdMs); notificationService.sendNotification(highLatencyNotification); + log.warn("{}", highLatencyNotification.getText()); } try { @@ -99,10 +94,11 @@ public class MonitoringReporter { } ObjectNode msg = JacksonUtil.newObjectNode(); - latencies.forEach(latency -> { - msg.set(latency.getKey(), new DoubleNode(latency.getAvg())); + latencies.values().forEach(latency -> { + msg.set(latency.getKey(), new DoubleNode(latency.getValue())); }); tbClient.saveEntityTelemetry(new AssetId(UUID.fromString(reportingAssetId)), "time", msg); + latencies.clear(); } catch (Exception e) { log.error("Failed to report latencies: {}", e.getMessage()); } @@ -112,7 +108,7 @@ public class MonitoringReporter { String latencyKey = key + "Latency"; double latencyInMs = (double) latencyInNanos / 1000_000; log.trace("Reporting latency [{}]: {} ms", key, latencyInMs); - latencies.computeIfAbsent(latencyKey, k -> new Latency(latencyKey)).report(latencyInMs); + latencies.put(latencyKey, Latency.of(latencyKey, latencyInMs)); } public void serviceFailure(Object serviceKey, Throwable error) {