From 06f30e1de534b6946f4dc08206024dca5de2e9e0 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 9 Sep 2024 23:00:29 +0200 Subject: [PATCH] minor refactoring --- ...AbstractMqttTimeseriesIntegrationTest.java | 55 ++++++++++--------- .../mqtt/gateway/GatewayLatencyService.java | 3 +- .../gateway/latency/GatewayLatencyData.java | 2 +- .../gateway/latency/GatewayLatencyState.java | 7 +-- 4 files changed, 34 insertions(+), 33 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java index 6deceb6fb9..2aeecfb663 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java @@ -40,7 +40,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.OptionalLong; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -137,7 +136,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt client.connectAndWait(gatewayAccessToken); Map> gwLatencies = new HashMap<>(); - List transportLatencies = new ArrayList<>(); + Map> transportLatencies = new HashMap<>(); publishLatency(client, gwLatencies, transportLatencies, 5); @@ -153,16 +152,18 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt Map latencyCheckValue = JacksonUtil.fromString((String) latencyCheckTelemetry.get("value"), new TypeReference<>() {}); assertNotNull(latencyCheckValue); - long avgTransportLatency = (long) transportLatencies.stream().mapToLong(Long::longValue).average().getAsDouble(); - long minTransportLatency = transportLatencies.stream().mapToLong(Long::longValue).min().getAsLong(); - long maxTransportLatency = transportLatencies.stream().mapToLong(Long::longValue).max().getAsLong(); - - gwLatencies.forEach((connectorName, gwLatencyList) -> { long avgGwLatency = (long) gwLatencyList.stream().mapToLong(Long::longValue).average().getAsDouble(); long minGwLatency = gwLatencyList.stream().mapToLong(Long::longValue).min().getAsLong(); long maxGwLatency = gwLatencyList.stream().mapToLong(Long::longValue).max().getAsLong(); + List transportLatencyList = transportLatencies.get(connectorName); + assertNotNull(transportLatencyList); + + long avgTransportLatency = (long) transportLatencyList.stream().mapToLong(Long::longValue).average().getAsDouble(); + long minTransportLatency = transportLatencyList.stream().mapToLong(Long::longValue).min().getAsLong(); + long maxTransportLatency = transportLatencyList.stream().mapToLong(Long::longValue).max().getAsLong(); + GatewayLatencyState.ConnectorLatencyResult connectorLatencyResult = latencyCheckValue.get(connectorName); assertNotNull(connectorLatencyResult); checkConnectorLatencyResult(connectorLatencyResult, avgGwLatency, minGwLatency, maxGwLatency, avgTransportLatency, minTransportLatency, maxTransportLatency); @@ -175,31 +176,33 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt .untilAsserted(() -> verify(gatewayLatencyService).onDeviceDisconnect(savedGateway.getId())); } - private void publishLatency(MqttTestClient client, Map> gwLatencies, List transportLatencies, int n) throws Exception { + private void publishLatency(MqttTestClient client, Map> gwLatencies, Map> transportLatencies, int n) throws Exception { Random random = new Random(); for (int i = 0; i < n; i++) { - long publishedTs = System.currentTimeMillis(); - long gatewayLatencyA = random.nextLong(1000, 5000); - long gatewayLatencyB = random.nextLong(1200, 4500); - long transportReceiveTs = publishLatencyAndGetTransportReceiveTs(client, publishedTs, gatewayLatencyA, gatewayLatencyB); + Map data = new HashMap<>(); + long publishedTs = System.currentTimeMillis() - 10; + long gatewayLatencyA = random.nextLong(100, 500); + data.put("connectorA", new GatewayLatencyData(publishedTs - gatewayLatencyA, publishedTs)); gwLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(gatewayLatencyA); - gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB); - transportLatencies.add(transportReceiveTs - publishedTs); - Thread.sleep(1); + boolean sendB = i % 2 == 0; + if (sendB) { + long gatewayLatencyB = random.nextLong(120, 450); + data.put("connectorB", new GatewayLatencyData(publishedTs - gatewayLatencyB, publishedTs)); + gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB); + } + + client.publishAndWait(GATEWAY_LATENCY_TOPIC, JacksonUtil.writeValueAsBytes(data)); + ArgumentCaptor transportReceiveTsCaptor = ArgumentCaptor.forClass(Long.class); + verify(gatewayLatencyService).process(any(), eq(savedGateway.getId()), eq(data), transportReceiveTsCaptor.capture()); + Long transportReceiveTs = transportReceiveTsCaptor.getValue(); + Long transportLatency = transportReceiveTs - publishedTs; + transportLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(transportLatency); + if (sendB) { + transportLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(transportLatency); + } } } - private long publishLatencyAndGetTransportReceiveTs(MqttTestClient client, long publishedTs, long gatewayLatencyA, long gatewayLatencyB) throws Exception { - List data = new ArrayList<>(); - data.add(new GatewayLatencyData("connectorA", publishedTs - gatewayLatencyA, publishedTs)); - data.add(new GatewayLatencyData("connectorB", publishedTs - gatewayLatencyB, publishedTs)); - - client.publishAndWait(GATEWAY_LATENCY_TOPIC, JacksonUtil.writeValueAsBytes(data)); - ArgumentCaptor transportReceiveTsCaptor = ArgumentCaptor.forClass(Long.class); - verify(gatewayLatencyService).process(any(), eq(savedGateway.getId()), eq(data), transportReceiveTsCaptor.capture()); - return transportReceiveTsCaptor.getValue(); - } - private void checkConnectorLatencyResult(GatewayLatencyState.ConnectorLatencyResult result, long avgGwLatency, long minGwLatency, long maxGwLatency, long avgTransportLatency, long minTransportLatency, long maxTransportLatency) { assertNotNull(result); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayLatencyService.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayLatencyService.java index c6e9002877..148a8819ee 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayLatencyService.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayLatencyService.java @@ -30,7 +30,6 @@ import org.thingsboard.server.transport.mqtt.TbMqttTransportComponent; import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyData; import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyState; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -56,7 +55,7 @@ public class GatewayLatencyService { scheduler.scheduleAtFixedRate(this::reportLatency, latencyReportIntervalSec, latencyReportIntervalSec, TimeUnit.SECONDS); } - public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, List data, long ts) { + public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, Map data, long ts) { states.computeIfAbsent(gatewayId, k -> new GatewayLatencyState(sessionInfo)).update(ts, data); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyData.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyData.java index 6260c8356e..2cae428a02 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyData.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyData.java @@ -15,5 +15,5 @@ */ package org.thingsboard.server.transport.mqtt.gateway.latency; -public record GatewayLatencyData(String connectorName, long receivedTs, long publishedTs) { +public record GatewayLatencyData(long receivedTs, long publishedTs) { } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyState.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyState.java index 3ed902f81d..c139bb3f19 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyState.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyState.java @@ -19,7 +19,6 @@ import lombok.Getter; import org.thingsboard.server.gen.transport.TransportProtos; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -44,11 +43,11 @@ public class GatewayLatencyState { this.sessionInfo = sessionInfo; } - public void update(long ts, List latencyData) { + public void update(long ts, Map latencyData) { updateLock.lock(); try { - latencyData.forEach(data -> { - connectors.computeIfAbsent(data.connectorName(), k -> new ConnectorLatencyState()).update(ts, data); + latencyData.forEach((connectorName, data) -> { + connectors.computeIfAbsent(connectorName, k -> new ConnectorLatencyState()).update(ts, data); }); } finally { updateLock.unlock();