minor refactoring

This commit is contained in:
YevhenBondarenko 2024-09-09 23:00:29 +02:00
parent d4c4bf5656
commit 06f30e1de5
4 changed files with 34 additions and 33 deletions

View File

@ -40,7 +40,6 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.OptionalLong;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -137,7 +136,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
client.connectAndWait(gatewayAccessToken); client.connectAndWait(gatewayAccessToken);
Map<String, List<Long>> gwLatencies = new HashMap<>(); Map<String, List<Long>> gwLatencies = new HashMap<>();
List<Long> transportLatencies = new ArrayList<>(); Map<String, List<Long>> transportLatencies = new HashMap<>();
publishLatency(client, gwLatencies, transportLatencies, 5); publishLatency(client, gwLatencies, transportLatencies, 5);
@ -153,16 +152,18 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
Map<String, GatewayLatencyState.ConnectorLatencyResult> latencyCheckValue = JacksonUtil.fromString((String) latencyCheckTelemetry.get("value"), new TypeReference<>() {}); Map<String, GatewayLatencyState.ConnectorLatencyResult> latencyCheckValue = JacksonUtil.fromString((String) latencyCheckTelemetry.get("value"), new TypeReference<>() {});
assertNotNull(latencyCheckValue); assertNotNull(latencyCheckValue);
long avgTransportLatency = (long) transportLatencies.stream().mapToLong(Long::longValue).average().getAsDouble();
long minTransportLatency = transportLatencies.stream().mapToLong(Long::longValue).min().getAsLong();
long maxTransportLatency = transportLatencies.stream().mapToLong(Long::longValue).max().getAsLong();
gwLatencies.forEach((connectorName, gwLatencyList) -> { gwLatencies.forEach((connectorName, gwLatencyList) -> {
long avgGwLatency = (long) gwLatencyList.stream().mapToLong(Long::longValue).average().getAsDouble(); long avgGwLatency = (long) gwLatencyList.stream().mapToLong(Long::longValue).average().getAsDouble();
long minGwLatency = gwLatencyList.stream().mapToLong(Long::longValue).min().getAsLong(); long minGwLatency = gwLatencyList.stream().mapToLong(Long::longValue).min().getAsLong();
long maxGwLatency = gwLatencyList.stream().mapToLong(Long::longValue).max().getAsLong(); long maxGwLatency = gwLatencyList.stream().mapToLong(Long::longValue).max().getAsLong();
List<Long> transportLatencyList = transportLatencies.get(connectorName);
assertNotNull(transportLatencyList);
long avgTransportLatency = (long) transportLatencyList.stream().mapToLong(Long::longValue).average().getAsDouble();
long minTransportLatency = transportLatencyList.stream().mapToLong(Long::longValue).min().getAsLong();
long maxTransportLatency = transportLatencyList.stream().mapToLong(Long::longValue).max().getAsLong();
GatewayLatencyState.ConnectorLatencyResult connectorLatencyResult = latencyCheckValue.get(connectorName); GatewayLatencyState.ConnectorLatencyResult connectorLatencyResult = latencyCheckValue.get(connectorName);
assertNotNull(connectorLatencyResult); assertNotNull(connectorLatencyResult);
checkConnectorLatencyResult(connectorLatencyResult, avgGwLatency, minGwLatency, maxGwLatency, avgTransportLatency, minTransportLatency, maxTransportLatency); checkConnectorLatencyResult(connectorLatencyResult, avgGwLatency, minGwLatency, maxGwLatency, avgTransportLatency, minTransportLatency, maxTransportLatency);
@ -175,31 +176,33 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
.untilAsserted(() -> verify(gatewayLatencyService).onDeviceDisconnect(savedGateway.getId())); .untilAsserted(() -> verify(gatewayLatencyService).onDeviceDisconnect(savedGateway.getId()));
} }
private void publishLatency(MqttTestClient client, Map<String, List<Long>> gwLatencies, List<Long> transportLatencies, int n) throws Exception { private void publishLatency(MqttTestClient client, Map<String, List<Long>> gwLatencies, Map<String, List<Long>> transportLatencies, int n) throws Exception {
Random random = new Random(); Random random = new Random();
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
long publishedTs = System.currentTimeMillis(); Map<String, GatewayLatencyData> data = new HashMap<>();
long gatewayLatencyA = random.nextLong(1000, 5000); long publishedTs = System.currentTimeMillis() - 10;
long gatewayLatencyB = random.nextLong(1200, 4500); long gatewayLatencyA = random.nextLong(100, 500);
long transportReceiveTs = publishLatencyAndGetTransportReceiveTs(client, publishedTs, gatewayLatencyA, gatewayLatencyB); data.put("connectorA", new GatewayLatencyData(publishedTs - gatewayLatencyA, publishedTs));
gwLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(gatewayLatencyA); gwLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(gatewayLatencyA);
gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB); boolean sendB = i % 2 == 0;
transportLatencies.add(transportReceiveTs - publishedTs); if (sendB) {
Thread.sleep(1); long gatewayLatencyB = random.nextLong(120, 450);
data.put("connectorB", new GatewayLatencyData(publishedTs - gatewayLatencyB, publishedTs));
gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB);
}
client.publishAndWait(GATEWAY_LATENCY_TOPIC, JacksonUtil.writeValueAsBytes(data));
ArgumentCaptor<Long> transportReceiveTsCaptor = ArgumentCaptor.forClass(Long.class);
verify(gatewayLatencyService).process(any(), eq(savedGateway.getId()), eq(data), transportReceiveTsCaptor.capture());
Long transportReceiveTs = transportReceiveTsCaptor.getValue();
Long transportLatency = transportReceiveTs - publishedTs;
transportLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(transportLatency);
if (sendB) {
transportLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(transportLatency);
}
} }
} }
private long publishLatencyAndGetTransportReceiveTs(MqttTestClient client, long publishedTs, long gatewayLatencyA, long gatewayLatencyB) throws Exception {
List<GatewayLatencyData> data = new ArrayList<>();
data.add(new GatewayLatencyData("connectorA", publishedTs - gatewayLatencyA, publishedTs));
data.add(new GatewayLatencyData("connectorB", publishedTs - gatewayLatencyB, publishedTs));
client.publishAndWait(GATEWAY_LATENCY_TOPIC, JacksonUtil.writeValueAsBytes(data));
ArgumentCaptor<Long> transportReceiveTsCaptor = ArgumentCaptor.forClass(Long.class);
verify(gatewayLatencyService).process(any(), eq(savedGateway.getId()), eq(data), transportReceiveTsCaptor.capture());
return transportReceiveTsCaptor.getValue();
}
private void checkConnectorLatencyResult(GatewayLatencyState.ConnectorLatencyResult result, long avgGwLatency, long minGwLatency, long maxGwLatency, private void checkConnectorLatencyResult(GatewayLatencyState.ConnectorLatencyResult result, long avgGwLatency, long minGwLatency, long maxGwLatency,
long avgTransportLatency, long minTransportLatency, long maxTransportLatency) { long avgTransportLatency, long minTransportLatency, long maxTransportLatency) {
assertNotNull(result); assertNotNull(result);

View File

@ -30,7 +30,6 @@ import org.thingsboard.server.transport.mqtt.TbMqttTransportComponent;
import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyData; import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyData;
import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyState; import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyState;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -56,7 +55,7 @@ public class GatewayLatencyService {
scheduler.scheduleAtFixedRate(this::reportLatency, latencyReportIntervalSec, latencyReportIntervalSec, TimeUnit.SECONDS); scheduler.scheduleAtFixedRate(this::reportLatency, latencyReportIntervalSec, latencyReportIntervalSec, TimeUnit.SECONDS);
} }
public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, List<GatewayLatencyData> data, long ts) { public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, Map<String, GatewayLatencyData> data, long ts) {
states.computeIfAbsent(gatewayId, k -> new GatewayLatencyState(sessionInfo)).update(ts, data); states.computeIfAbsent(gatewayId, k -> new GatewayLatencyState(sessionInfo)).update(ts, data);
} }

View File

@ -15,5 +15,5 @@
*/ */
package org.thingsboard.server.transport.mqtt.gateway.latency; package org.thingsboard.server.transport.mqtt.gateway.latency;
public record GatewayLatencyData(String connectorName, long receivedTs, long publishedTs) { public record GatewayLatencyData(long receivedTs, long publishedTs) {
} }

View File

@ -19,7 +19,6 @@ import lombok.Getter;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -44,11 +43,11 @@ public class GatewayLatencyState {
this.sessionInfo = sessionInfo; this.sessionInfo = sessionInfo;
} }
public void update(long ts, List<GatewayLatencyData> latencyData) { public void update(long ts, Map<String, GatewayLatencyData> latencyData) {
updateLock.lock(); updateLock.lock();
try { try {
latencyData.forEach(data -> { latencyData.forEach((connectorName, data) -> {
connectors.computeIfAbsent(data.connectorName(), k -> new ConnectorLatencyState()).update(ts, data); connectors.computeIfAbsent(connectorName, k -> new ConnectorLatencyState()).update(ts, data);
}); });
} finally { } finally {
updateLock.unlock(); updateLock.unlock();