Refactoring for latencies tracking
This commit is contained in:
		
							parent
							
								
									333ab064cc
								
							
						
					
					
						commit
						1b9dfa17c2
					
				@ -25,4 +25,8 @@ public class Latencies {
 | 
			
		||||
        return String.format("%sRequest", key);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static String wsUpdate(String key) {
 | 
			
		||||
        return String.format("%sWsUpdate", key);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,53 +15,15 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.monitoring.data;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.AtomicDouble;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@Data(staticConstructor = "of")
 | 
			
		||||
public class Latency {
 | 
			
		||||
 | 
			
		||||
    private final String key;
 | 
			
		||||
    private final AtomicDouble latencySum = new AtomicDouble();
 | 
			
		||||
    private final AtomicInteger counter = new AtomicInteger();
 | 
			
		||||
    private final double value;
 | 
			
		||||
 | 
			
		||||
    public synchronized void report(double latencyInMs) {
 | 
			
		||||
        latencySum.addAndGet(latencyInMs);
 | 
			
		||||
        counter.incrementAndGet();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public synchronized double getAvg() {
 | 
			
		||||
        return latencySum.get() / counter.get();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public boolean isNotEmpty() {
 | 
			
		||||
        return counter.get() > 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public synchronized void reset() {
 | 
			
		||||
        latencySum.set(0.0);
 | 
			
		||||
        counter.set(0);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public String getKey() {
 | 
			
		||||
        return key;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public synchronized Latency snapshot() {
 | 
			
		||||
        Latency snapshot = new Latency(key);
 | 
			
		||||
        snapshot.latencySum.set(latencySum.get());
 | 
			
		||||
        snapshot.counter.set(counter.get());
 | 
			
		||||
        return snapshot;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String toString() {
 | 
			
		||||
        return "Latency{" +
 | 
			
		||||
                "key='" + key + '\'' +
 | 
			
		||||
                ", avgLatency=" + getAvg() +
 | 
			
		||||
                '}';
 | 
			
		||||
    public String getFormattedValue() {
 | 
			
		||||
        return String.format("%,.2f ms", value);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -21,11 +21,11 @@ import java.util.Collection;
 | 
			
		||||
 | 
			
		||||
public class HighLatencyNotification implements Notification {
 | 
			
		||||
 | 
			
		||||
    private final Collection<Latency> latencies;
 | 
			
		||||
    private final Collection<Latency> highLatencies;
 | 
			
		||||
    private final int thresholdMs;
 | 
			
		||||
 | 
			
		||||
    public HighLatencyNotification(Collection<Latency> latencies, int thresholdMs) {
 | 
			
		||||
        this.latencies = latencies;
 | 
			
		||||
    public HighLatencyNotification(Collection<Latency> highLatencies, int thresholdMs) {
 | 
			
		||||
        this.highLatencies = highLatencies;
 | 
			
		||||
        this.thresholdMs = thresholdMs;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -33,8 +33,8 @@ public class HighLatencyNotification implements Notification {
 | 
			
		||||
    public String getText() {
 | 
			
		||||
        StringBuilder text = new StringBuilder();
 | 
			
		||||
        text.append("Some of the latencies are higher than ").append(thresholdMs).append(" ms:\n");
 | 
			
		||||
        latencies.forEach(latency -> {
 | 
			
		||||
            text.append(String.format("[%s] %,.2f ms\n", latency.getKey(), latency.getAvg()));
 | 
			
		||||
        highLatencies.forEach(latency -> {
 | 
			
		||||
            text.append(String.format("[%s] %s\n", latency.getKey(), latency.getFormattedValue()));
 | 
			
		||||
        });
 | 
			
		||||
        return text.toString();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -96,7 +96,7 @@ public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends Mo
 | 
			
		||||
        } else if (!update.toString().equals(testValue)) {
 | 
			
		||||
            throw new ServiceFailureException("Was expecting value " + testValue + " but got " + update);
 | 
			
		||||
        }
 | 
			
		||||
        reporter.reportLatency(Latencies.WS_UPDATE, stopWatch.getTime());
 | 
			
		||||
        reporter.reportLatency(Latencies.wsUpdate(getKey()), stopWatch.getTime());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -36,7 +36,7 @@ import java.util.UUID;
 | 
			
		||||
@Slf4j
 | 
			
		||||
public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T extends MonitoringTarget> {
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    @Autowired(required = false)
 | 
			
		||||
    private List<C> configs;
 | 
			
		||||
    private final List<BaseHealthChecker<C, T>> healthCheckers = new LinkedList<>();
 | 
			
		||||
    private final List<UUID> devices = new LinkedList<>();
 | 
			
		||||
@ -54,6 +54,9 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    private void init() {
 | 
			
		||||
        if (configs == null || configs.isEmpty()) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        tbClient.logIn();
 | 
			
		||||
        configs.forEach(config -> {
 | 
			
		||||
            config.getTargets().forEach(target -> {
 | 
			
		||||
 | 
			
		||||
@ -63,25 +63,20 @@ public class MonitoringReporter {
 | 
			
		||||
    private String reportingAssetId;
 | 
			
		||||
 | 
			
		||||
    public void reportLatencies(TbClient tbClient) {
 | 
			
		||||
        List<Latency> latencies = this.latencies.values().stream()
 | 
			
		||||
                .filter(Latency::isNotEmpty)
 | 
			
		||||
                .map(latency -> {
 | 
			
		||||
                    Latency snapshot = latency.snapshot();
 | 
			
		||||
                    latency.reset();
 | 
			
		||||
                    return snapshot;
 | 
			
		||||
                })
 | 
			
		||||
                .collect(Collectors.toList());
 | 
			
		||||
        if (latencies.isEmpty()) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        log.info("Latencies:\n{}", latencies.stream().map(latency -> latency.getKey() + ": " + latency.getAvg() + " ms")
 | 
			
		||||
        log.debug("Latencies:\n{}", latencies.values().stream().map(latency -> latency.getKey() + ": " + latency.getFormattedValue())
 | 
			
		||||
                .collect(Collectors.joining("\n")) + "\n");
 | 
			
		||||
 | 
			
		||||
        if (!latencyReportingEnabled) return;
 | 
			
		||||
 | 
			
		||||
        if (latencies.stream().anyMatch(latency -> latency.getAvg() >= (double) latencyThresholdMs)) {
 | 
			
		||||
            HighLatencyNotification highLatencyNotification = new HighLatencyNotification(latencies, latencyThresholdMs);
 | 
			
		||||
        List<Latency> highLatencies = latencies.values().stream()
 | 
			
		||||
                .filter(latency -> latency.getValue() >= (double) latencyThresholdMs)
 | 
			
		||||
                .collect(Collectors.toList());
 | 
			
		||||
        if (!highLatencies.isEmpty()) {
 | 
			
		||||
            HighLatencyNotification highLatencyNotification = new HighLatencyNotification(highLatencies, latencyThresholdMs);
 | 
			
		||||
            notificationService.sendNotification(highLatencyNotification);
 | 
			
		||||
            log.warn("{}", highLatencyNotification.getText());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
@ -99,10 +94,11 @@ public class MonitoringReporter {
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            ObjectNode msg = JacksonUtil.newObjectNode();
 | 
			
		||||
            latencies.forEach(latency -> {
 | 
			
		||||
                msg.set(latency.getKey(), new DoubleNode(latency.getAvg()));
 | 
			
		||||
            latencies.values().forEach(latency -> {
 | 
			
		||||
                msg.set(latency.getKey(), new DoubleNode(latency.getValue()));
 | 
			
		||||
            });
 | 
			
		||||
            tbClient.saveEntityTelemetry(new AssetId(UUID.fromString(reportingAssetId)), "time", msg);
 | 
			
		||||
            latencies.clear();
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("Failed to report latencies: {}", e.getMessage());
 | 
			
		||||
        }
 | 
			
		||||
@ -112,7 +108,7 @@ public class MonitoringReporter {
 | 
			
		||||
        String latencyKey = key + "Latency";
 | 
			
		||||
        double latencyInMs = (double) latencyInNanos / 1000_000;
 | 
			
		||||
        log.trace("Reporting latency [{}]: {} ms", key, latencyInMs);
 | 
			
		||||
        latencies.computeIfAbsent(latencyKey, k -> new Latency(latencyKey)).report(latencyInMs);
 | 
			
		||||
        latencies.put(latencyKey, Latency.of(latencyKey, latencyInMs));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void serviceFailure(Object serviceKey, Throwable error) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user