From 914d4b8ca378957f522a9f0d551b22fd388ab333 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 12 Sep 2024 14:00:40 +0200 Subject: [PATCH] ranemed latency to metrics --- .../src/main/resources/thingsboard.yml | 4 +- ...AbstractMqttTimeseriesIntegrationTest.java | 47 +++++++++---------- .../data/device/profile/MqttTopics.java | 2 +- .../transport/mqtt/MqttTransportContext.java | 4 +- .../transport/mqtt/MqttTransportHandler.java | 2 +- ...ervice.java => GatewayMetricsService.java} | 36 +++++++------- .../GatewayMetricsData.java} | 4 +- .../GatewayMetricsState.java} | 38 +++++++-------- .../mqtt/session/GatewaySessionHandler.java | 14 +++--- .../src/main/resources/tb-mqtt-transport.yml | 2 + 10 files changed, 78 insertions(+), 75 deletions(-) rename common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/{GatewayLatencyService.java => GatewayMetricsService.java} (73%) rename common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/{latency/GatewayLatencyData.java => metrics/GatewayMetricsData.java} (83%) rename common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/{latency/GatewayLatencyState.java => metrics/GatewayMetricsState.java} (71%) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 2fc35d915a..6b61838918 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1020,8 +1020,8 @@ transport: # MQTT disconnect timeout in milliseconds. The time to wait for the client to disconnect after the server sends a disconnect message. disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}" msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before the device connected state. This limit works on the low level before TenantProfileLimits mechanism - # Interval of periodic report of the gateway latency - gateway_latency_report_interval_sec: "${MQTT_GATEWAY_LATENCY_REPORT_INTERVAL_SEC:3600}" + # Interval of periodic report of the gateway metrics + gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:3600}" netty: # Netty leak detector level leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}" diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java index 886cf1e309..4f89a8018e 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java @@ -25,13 +25,12 @@ import org.mockito.ArgumentCaptor; import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; -import org.thingsboard.server.transport.mqtt.gateway.GatewayLatencyService; -import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyData; -import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyState; +import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService; +import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData; +import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; @@ -56,9 +55,9 @@ import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVIC import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC; -import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_LATENCY_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_METRICS_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_TELEMETRY_TOPIC; -import static org.thingsboard.server.transport.mqtt.mqttv3.credentials.BasicMqttCredentialsTest.CLIENT_ID; +import static org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService.METRICS_CHECK; @Slf4j public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqttIntegrationTest { @@ -70,7 +69,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt " \"key5\": {\"someNumber\": 42, \"someArray\": [1,2,3], \"someNestedObject\": {\"key\": \"value\"}}}"; @SpyBean - GatewayLatencyService gatewayLatencyService; + GatewayMetricsService gatewayMetricsService; @Before public void beforeTest() throws Exception { @@ -133,9 +132,9 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt } @Test - public void testPushLatencyGateway() throws Exception { + public void testPushMetricsGateway() throws Exception { MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() - .gatewayName("Test latency gateway") + .gatewayName("Test metrics gateway") .build(); processBeforeTest(configProperties); @@ -147,16 +146,16 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt publishLatency(client, gwLatencies, transportLatencies, 5); - gatewayLatencyService.reportLatency(); + gatewayMetricsService.reportMetrics(); - List actualKeys = getActualKeysList(savedGateway.getId(), List.of("latencyCheck")); - assertEquals("latencyCheck", actualKeys.get(0)); + List actualKeys = getActualKeysList(savedGateway.getId(), List.of(METRICS_CHECK)); + assertEquals(METRICS_CHECK, actualKeys.get(0)); - String telemetryUrl = String.format("/api/plugins/telemetry/DEVICE/%s/values/timeseries?startTs=%d&endTs=%d&keys=latencyCheck", savedGateway.getId(), 0, System.currentTimeMillis()); + String telemetryUrl = String.format("/api/plugins/telemetry/DEVICE/%s/values/timeseries?startTs=%d&endTs=%d&keys=%s", savedGateway.getId(), 0, System.currentTimeMillis(), METRICS_CHECK); Map>> gatewayTelemetry = doGetAsyncTyped(telemetryUrl, new TypeReference<>() {}); - Map latencyCheckTelemetry = gatewayTelemetry.get("latencyCheck").get(0); - Map latencyCheckValue = JacksonUtil.fromString((String) latencyCheckTelemetry.get("value"), new TypeReference<>() {}); + Map latencyCheckTelemetry = gatewayTelemetry.get(METRICS_CHECK).get(0); + Map latencyCheckValue = JacksonUtil.fromString((String) latencyCheckTelemetry.get("value"), new TypeReference<>() {}); assertNotNull(latencyCheckValue); gwLatencies.forEach((connectorName, gwLatencyList) -> { @@ -171,7 +170,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt long minTransportLatency = transportLatencyList.stream().mapToLong(Long::longValue).min().getAsLong(); long maxTransportLatency = transportLatencyList.stream().mapToLong(Long::longValue).max().getAsLong(); - GatewayLatencyState.ConnectorLatencyResult connectorLatencyResult = latencyCheckValue.get(connectorName); + GatewayMetricsState.ConnectorMetricsResult connectorLatencyResult = latencyCheckValue.get(connectorName); assertNotNull(connectorLatencyResult); checkConnectorLatencyResult(connectorLatencyResult, avgGwLatency, minGwLatency, maxGwLatency, avgTransportLatency, minTransportLatency, maxTransportLatency); }); @@ -180,27 +179,27 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt Awaitility.await() .atMost(5, TimeUnit.SECONDS) - .untilAsserted(() -> verify(gatewayLatencyService).onDeviceDisconnect(savedGateway.getId())); + .untilAsserted(() -> verify(gatewayMetricsService).onDeviceDisconnect(savedGateway.getId())); } private void publishLatency(MqttTestClient client, Map> gwLatencies, Map> transportLatencies, int n) throws Exception { Random random = new Random(); for (int i = 0; i < n; i++) { - Map data = new HashMap<>(); + Map data = new HashMap<>(); long publishedTs = System.currentTimeMillis() - 10; long gatewayLatencyA = random.nextLong(100, 500); - data.put("connectorA", new GatewayLatencyData(publishedTs - gatewayLatencyA, publishedTs)); + data.put("connectorA", new GatewayMetricsData(publishedTs - gatewayLatencyA, publishedTs)); gwLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(gatewayLatencyA); boolean sendB = i % 2 == 0; if (sendB) { long gatewayLatencyB = random.nextLong(120, 450); - data.put("connectorB", new GatewayLatencyData(publishedTs - gatewayLatencyB, publishedTs)); + data.put("connectorB", new GatewayMetricsData(publishedTs - gatewayLatencyB, publishedTs)); gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB); } - client.publishAndWait(GATEWAY_LATENCY_TOPIC, JacksonUtil.writeValueAsBytes(data)); + client.publishAndWait(GATEWAY_METRICS_TOPIC, JacksonUtil.writeValueAsBytes(data)); ArgumentCaptor transportReceiveTsCaptor = ArgumentCaptor.forClass(Long.class); - verify(gatewayLatencyService).process(any(), eq(savedGateway.getId()), eq(data), transportReceiveTsCaptor.capture()); + verify(gatewayMetricsService).process(any(), eq(savedGateway.getId()), eq(data), transportReceiveTsCaptor.capture()); Long transportReceiveTs = transportReceiveTsCaptor.getValue(); Long transportLatency = transportReceiveTs - publishedTs; transportLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(transportLatency); @@ -210,13 +209,13 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt } } - private void checkConnectorLatencyResult(GatewayLatencyState.ConnectorLatencyResult result, long avgGwLatency, long minGwLatency, long maxGwLatency, + private void checkConnectorLatencyResult(GatewayMetricsState.ConnectorMetricsResult result, long avgGwLatency, long minGwLatency, long maxGwLatency, long avgTransportLatency, long minTransportLatency, long maxTransportLatency) { assertNotNull(result); assertEquals(avgGwLatency, result.avgGwLatency()); assertEquals(minGwLatency, result.minGwLatency()); assertEquals(maxGwLatency, result.maxGwLatency()); - assertEquals(avgTransportLatency, result.transportLatencyAvg()); + assertEquals(avgTransportLatency, result.avgTransportLatency()); assertEquals(minTransportLatency, result.minTransportLatency()); assertEquals(maxTransportLatency, result.maxTransportLatency()); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java index 8945f2d084..0590d57034 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java @@ -75,7 +75,7 @@ public class MqttTopics { public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + RPC; public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_REQUEST; public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_RESPONSE; - public static final String GATEWAY_LATENCY_TOPIC = BASE_GATEWAY_API_TOPIC + "/latency"; + public static final String GATEWAY_METRICS_TOPIC = BASE_GATEWAY_API_TOPIC + "/metrics"; // v2 topics public static final String BASE_DEVICE_API_TOPIC_V2 = "v2"; public static final String REQUEST_ID_PATTERN = "(?\\d+)"; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index dc61a47d78..d78a4afbc2 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -27,7 +27,7 @@ import org.thingsboard.server.common.transport.TransportContext; import org.thingsboard.server.common.transport.TransportTenantProfileCache; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; -import org.thingsboard.server.transport.mqtt.gateway.GatewayLatencyService; +import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService; import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicInteger; @@ -58,7 +58,7 @@ public class MqttTransportContext extends TransportContext { @Getter @Autowired - private GatewayLatencyService gatewayLatencyService; + private GatewayMetricsService gatewayMetricsService; @Getter @Value("${transport.mqtt.netty.max_payload_size}") diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 4ae0d54a6b..1eee03f722 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -423,7 +423,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.GATEWAY_DISCONNECT_TOPIC: gatewaySessionHandler.onDeviceDisconnect(mqttMsg); break; - case MqttTopics.GATEWAY_LATENCY_TOPIC: + case MqttTopics.GATEWAY_METRICS_TOPIC: gatewaySessionHandler.onGatewayLatency(mqttMsg); break; default: diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayLatencyService.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayMetricsService.java similarity index 73% rename from common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayLatencyService.java rename to common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayMetricsService.java index acf10a03b8..263888f4c2 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayLatencyService.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayMetricsService.java @@ -27,8 +27,8 @@ import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.transport.mqtt.TbMqttTransportComponent; -import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyData; -import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyState; +import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData; +import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -37,10 +37,12 @@ import java.util.concurrent.TimeUnit; @Slf4j @Service @TbMqttTransportComponent -public class GatewayLatencyService { +public class GatewayMetricsService { - @Value("${transport.mqtt.gateway_latency_report_interval_sec:3600}") - private int latencyReportIntervalSec; + public static final String METRICS_CHECK = "metricsCheck"; + + @Value("${transport.mqtt.gateway_metrics_report_interval_sec:3600}") + private int metricsReportIntervalSec; @Autowired private SchedulerComponent scheduler; @@ -48,15 +50,15 @@ public class GatewayLatencyService { @Autowired private TransportService transportService; - private Map states = new ConcurrentHashMap<>(); + private Map states = new ConcurrentHashMap<>(); @PostConstruct private void init() { - scheduler.scheduleAtFixedRate(this::reportLatency, latencyReportIntervalSec, latencyReportIntervalSec, TimeUnit.SECONDS); + scheduler.scheduleAtFixedRate(this::reportMetrics, metricsReportIntervalSec, metricsReportIntervalSec, TimeUnit.SECONDS); } - public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, Map data, long ts) { - states.computeIfAbsent(gatewayId, k -> new GatewayLatencyState(sessionInfo)).update(ts, data); + public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, Map data, long ts) { + states.computeIfAbsent(gatewayId, k -> new GatewayMetricsState(sessionInfo)).update(ts, data); } public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId) { @@ -71,34 +73,34 @@ public class GatewayLatencyService { } public void onDeviceDisconnect(DeviceId deviceId) { - GatewayLatencyState state = states.remove(deviceId); + GatewayMetricsState state = states.remove(deviceId); if (state != null) { - reportLatency(state, System.currentTimeMillis()); + reportMetrics(state, System.currentTimeMillis()); } } - public void reportLatency() { + public void reportMetrics() { if (states.isEmpty()) { return; } - Map oldStates = states; + Map oldStates = states; states = new ConcurrentHashMap<>(); long ts = System.currentTimeMillis(); oldStates.forEach((gatewayId, state) -> { - reportLatency(state, ts); + reportMetrics(state, ts); }); oldStates.clear(); } - private void reportLatency(GatewayLatencyState state, long ts) { + private void reportMetrics(GatewayMetricsState state, long ts) { if (state.isEmpty()) { return; } - var result = state.getLatencyStateResult(); + var result = state.getStateResult(); var kvProto = TransportProtos.KeyValueProto.newBuilder() - .setKey("latencyCheck") + .setKey(METRICS_CHECK) .setType(TransportProtos.KeyValueType.JSON_V) .setJsonV(JacksonUtil.toString(result)) .build(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyData.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsData.java similarity index 83% rename from common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyData.java rename to common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsData.java index 2cae428a02..23cc730b86 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyData.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsData.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.transport.mqtt.gateway.latency; +package org.thingsboard.server.transport.mqtt.gateway.metrics; -public record GatewayLatencyData(long receivedTs, long publishedTs) { +public record GatewayMetricsData(long receivedTs, long publishedTs) { } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyState.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsState.java similarity index 71% rename from common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyState.java rename to common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsState.java index 3ad75fee75..45f945151b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/latency/GatewayLatencyState.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsState.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.transport.mqtt.gateway.latency; +package org.thingsboard.server.transport.mqtt.gateway.metrics; import lombok.Getter; import org.thingsboard.server.gen.transport.TransportProtos; @@ -25,15 +25,15 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -public class GatewayLatencyState { +public class GatewayMetricsState { - private final Map connectors; + private final Map connectors; private final Lock updateLock; @Getter private volatile TransportProtos.SessionInfoProto sessionInfo; - public GatewayLatencyState(TransportProtos.SessionInfoProto sessionInfo) { + public GatewayMetricsState(TransportProtos.SessionInfoProto sessionInfo) { this.connectors = new HashMap<>(); this.updateLock = new ReentrantLock(); this.sessionInfo = sessionInfo; @@ -43,19 +43,19 @@ public class GatewayLatencyState { this.sessionInfo = sessionInfo; } - public void update(long ts, Map latencyData) { + public void update(long ts, Map metricsData) { updateLock.lock(); try { - latencyData.forEach((connectorName, data) -> { - connectors.computeIfAbsent(connectorName, k -> new ConnectorLatencyState()).update(ts, data); + metricsData.forEach((connectorName, data) -> { + connectors.computeIfAbsent(connectorName, k -> new ConnectorMetricsState()).update(ts, data); }); } finally { updateLock.unlock(); } } - public Map getLatencyStateResult() { - Map result = new HashMap<>(); + public Map getStateResult() { + Map result = new HashMap<>(); updateLock.lock(); try { connectors.forEach((name, state) -> result.put(name, state.getResult())); @@ -71,7 +71,7 @@ public class GatewayLatencyState { return connectors.isEmpty(); } - private static class ConnectorLatencyState { + private static class ConnectorMetricsState { private final AtomicInteger count; private final AtomicLong gwLatencySum; private final AtomicLong transportLatencySum; @@ -80,15 +80,15 @@ public class GatewayLatencyState { private volatile long minTransportLatency; private volatile long maxTransportLatency; - private ConnectorLatencyState() { + private ConnectorMetricsState() { this.count = new AtomicInteger(0); this.gwLatencySum = new AtomicLong(0); this.transportLatencySum = new AtomicLong(0); } - private void update(long serverReceiveTs, GatewayLatencyData latencyData) { - long gwLatency = latencyData.publishedTs() - latencyData.receivedTs(); - long transportLatency = serverReceiveTs - latencyData.publishedTs(); + private void update(long serverReceiveTs, GatewayMetricsData metricsData) { + long gwLatency = metricsData.publishedTs() - metricsData.receivedTs(); + long transportLatency = serverReceiveTs - metricsData.publishedTs(); count.incrementAndGet(); gwLatencySum.addAndGet(gwLatency); transportLatencySum.addAndGet(transportLatency); @@ -106,16 +106,16 @@ public class GatewayLatencyState { } } - private ConnectorLatencyResult getResult() { + private ConnectorMetricsResult getResult() { long count = this.count.get(); long avgGwLatency = gwLatencySum.get() / count; - long transportLatencyAvg = transportLatencySum.get() / count; - return new ConnectorLatencyResult(avgGwLatency, minGwLatency, maxGwLatency, transportLatencyAvg, minTransportLatency, maxTransportLatency); + long avgTransportLatency = transportLatencySum.get() / count; + return new ConnectorMetricsResult(avgGwLatency, minGwLatency, maxGwLatency, avgTransportLatency, minTransportLatency, maxTransportLatency); } } - public record ConnectorLatencyResult(long avgGwLatency, long minGwLatency, long maxGwLatency, - long transportLatencyAvg, long minTransportLatency, long maxTransportLatency) { + public record ConnectorMetricsResult(long avgGwLatency, long minGwLatency, long maxGwLatency, + long avgTransportLatency, long minTransportLatency, long maxTransportLatency) { } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index a7634593ce..32eaf6e0d7 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -27,7 +27,7 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.transport.mqtt.gateway.GatewayLatencyService; +import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService; import java.util.Optional; import java.util.UUID; @@ -40,11 +40,11 @@ import static com.amazonaws.util.StringUtils.UTF8; @Slf4j public class GatewaySessionHandler extends AbstractGatewaySessionHandler { - private final GatewayLatencyService latencyService; + private final GatewayMetricsService gatewayMetricsService; public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) { super(deviceSessionCtx, sessionId, overwriteDevicesActivity); - this.latencyService = deviceSessionCtx.getContext().getGatewayLatencyService(); + this.gatewayMetricsService = deviceSessionCtx.getContext().getGatewayMetricsService(); } public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException { @@ -72,16 +72,16 @@ public class GatewaySessionHandler extends AbstractGatewaySessionHandler deviceProfileOpt) { this.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); - latencyService.onDeviceUpdate(sessionInfo, gateway.getDeviceId()); + gatewayMetricsService.onDeviceUpdate(sessionInfo, gateway.getDeviceId()); } public void onGatewayDelete(DeviceId deviceId) { - latencyService.onDeviceDelete(deviceId); + gatewayMetricsService.onDeviceDelete(deviceId); } public void onGatewayLatency(MqttPublishMessage mqttMsg) throws AdaptorException { @@ -94,7 +94,7 @@ public class GatewaySessionHandler extends AbstractGatewaySessionHandler() {}), ts); + gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), JacksonUtil.fromString(payload, new TypeReference<>() {}), ts); ack(msgId, MqttReasonCodes.PubAck.SUCCESS); } catch (Throwable t) { ackOrClose(msgId); diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 140839ccc3..d2ac105623 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -146,6 +146,8 @@ transport: # MQTT disconnect timeout in milliseconds. The time to wait for the client to disconnect after the server sends a disconnect message. disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}" msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before device connected state. This limit works on low level before TenantProfileLimits mechanism + # Interval of periodic report of the gateway metrics + gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:3600}" netty: # Netty leak detector level leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"