From 137b1732bbc8ec769ac5c2d5b98b2b247a7fe26e Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 12 Sep 2024 22:28:10 +0200 Subject: [PATCH] implemented telemetry metadata instead of separate topic for metrics --- .../src/main/resources/thingsboard.yml | 2 +- ...AbstractMqttTimeseriesIntegrationTest.java | 73 ++++++++++--------- .../data/device/profile/MqttTopics.java | 1 - .../server/common/adaptor/JsonConverter.java | 20 ++++- .../transport/mqtt/MqttTransportHandler.java | 5 +- .../mqtt/gateway/GatewayMetricsService.java | 19 ++--- .../gateway/metrics/GatewayMetricsData.java | 2 +- .../gateway/metrics/GatewayMetricsState.java | 9 ++- .../AbstractGatewaySessionHandler.java | 22 +++++- .../mqtt/session/GatewaySessionHandler.java | 31 -------- .../src/main/resources/tb-mqtt-transport.yml | 2 +- 11 files changed, 91 insertions(+), 95 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 6b61838918..718c4823c4 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1021,7 +1021,7 @@ transport: disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}" msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before the device connected state. This limit works on the low level before TenantProfileLimits mechanism # Interval of periodic report of the gateway metrics - gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:3600}" + gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:60}" netty: # Netty leak detector level leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}" diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java index 32854a8735..f5e21e9e18 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java @@ -18,7 +18,6 @@ package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries; import com.fasterxml.jackson.core.type.TypeReference; import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; -import org.awaitility.Awaitility; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -55,7 +54,6 @@ import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVIC import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC; -import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_METRICS_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_TELEMETRY_TOPIC; import static org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService.GATEWAY_METRICS; @@ -124,8 +122,8 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt String deviceName = "Device A"; Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class), - 20, - 100); + 20, + 100); assertNotNull(device); client.disconnect(); @@ -138,13 +136,10 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt .build(); processBeforeTest(configProperties); - MqttTestClient client = new MqttTestClient(); - client.connectAndWait(gatewayAccessToken); - Map> gwLatencies = new HashMap<>(); Map> transportLatencies = new HashMap<>(); - publishLatency(client, gwLatencies, transportLatencies, 5); + publishLatency(gwLatencies, transportLatencies, 5); gatewayMetricsService.reportMetrics(); @@ -174,38 +169,37 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt assertNotNull(connectorLatencyResult); checkConnectorLatencyResult(connectorLatencyResult, avgGwLatency, minGwLatency, maxGwLatency, avgTransportLatency, minTransportLatency, maxTransportLatency); }); - - client.disconnect(); - - Awaitility.await() - .atMost(5, TimeUnit.SECONDS) - .untilAsserted(() -> verify(gatewayMetricsService).onDeviceDisconnect(savedGateway.getId())); } - private void publishLatency(MqttTestClient client, Map> gwLatencies, Map> transportLatencies, int n) throws Exception { + private void publishLatency(Map> gwLatencies, Map> transportLatencies, int n) throws Exception { Random random = new Random(); for (int i = 0; i < n; i++) { - Map data = new HashMap<>(); long publishedTs = System.currentTimeMillis() - 10; long gatewayLatencyA = random.nextLong(100, 500); - data.put("connectorA", new GatewayMetricsData(publishedTs - gatewayLatencyA, publishedTs)); + var firstData = new GatewayMetricsData("connectorA", publishedTs - gatewayLatencyA, publishedTs); gwLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(gatewayLatencyA); - boolean sendB = i % 2 == 0; - if (sendB) { - long gatewayLatencyB = random.nextLong(120, 450); - data.put("connectorB", new GatewayMetricsData(publishedTs - gatewayLatencyB, publishedTs)); - gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB); - } + long gatewayLatencyB = random.nextLong(120, 450); + var secondData = new GatewayMetricsData("connectorB", publishedTs - gatewayLatencyB, publishedTs); + gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB); - client.publishAndWait(GATEWAY_METRICS_TOPIC, JacksonUtil.writeValueAsBytes(data)); - ArgumentCaptor transportReceiveTsCaptor = ArgumentCaptor.forClass(Long.class); - verify(gatewayMetricsService).process(any(), eq(savedGateway.getId()), eq(data), transportReceiveTsCaptor.capture()); - Long transportReceiveTs = transportReceiveTsCaptor.getValue(); - Long transportLatency = transportReceiveTs - publishedTs; - transportLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(transportLatency); - if (sendB) { - transportLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(transportLatency); - } + List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); + String deviceName1 = "Device A"; + String deviceName2 = "Device B"; + String firstMetadata = JacksonUtil.writeValueAsString(firstData); + String secondMetadata = JacksonUtil.writeValueAsString(secondData); + String payload = getGatewayTelemetryJsonPayloadWithMetadata(deviceName1, deviceName2, "10000", "20000", firstMetadata, secondMetadata); + processGatewayTelemetryTest(GATEWAY_TELEMETRY_TOPIC, expectedKeys, payload.getBytes(), deviceName1, deviceName2); + + ArgumentCaptor transportReceiveTsCaptorA = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor transportReceiveTsCaptorB = ArgumentCaptor.forClass(Long.class); + verify(gatewayMetricsService).process(any(), eq(savedGateway.getId()), eq(List.of(firstData)), transportReceiveTsCaptorA.capture()); + verify(gatewayMetricsService).process(any(), eq(savedGateway.getId()), eq(List.of(secondData)), transportReceiveTsCaptorB.capture()); + Long transportReceiveTsA = transportReceiveTsCaptorA.getValue(); + Long transportReceiveTsB = transportReceiveTsCaptorB.getValue(); + Long transportLatencyA = transportReceiveTsA - publishedTs; + Long transportLatencyB = transportReceiveTsB - publishedTs; + transportLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(transportLatencyA); + transportLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(transportLatencyB); } } @@ -250,7 +244,8 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt long end = System.currentTimeMillis() + 5000; Map>> values = null; while (start <= end) { - values = doGetAsyncTyped(getTelemetryValuesUrl, new TypeReference<>() {}); + values = doGetAsyncTyped(getTelemetryValuesUrl, new TypeReference<>() { + }); boolean valid = values.size() == expectedKeys.size(); if (valid) { for (String key : expectedKeys) { @@ -289,7 +284,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt MqttTestClient client = new MqttTestClient(); client.connectAndWait(gatewayAccessToken); client.publishAndWait(topic, payload); - client.disconnect(); + client.disconnectAndWait(); Device firstDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class), 20, @@ -346,6 +341,16 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt return "{\"" + deviceA + "\": " + payload + ", \"" + deviceB + "\": " + payload + "}"; } + protected String getGatewayTelemetryJsonPayloadWithMetadata(String deviceA, String deviceB, String firstTsValue, String secondTsValue, String firstMetadata, String secondMetadata) { + String payloadA = "[{\"ts\": " + firstTsValue + ", \"values\": " + PAYLOAD_VALUES_STR + ", \"metadata\":" + firstMetadata + "}, " + + "{\"ts\": " + secondTsValue + ", \"values\": " + PAYLOAD_VALUES_STR + "}]"; + + String payloadB = "[{\"ts\": " + firstTsValue + ", \"values\": " + PAYLOAD_VALUES_STR + ", \"metadata\":" + secondMetadata + "}, " + + "{\"ts\": " + secondTsValue + ", \"values\": " + PAYLOAD_VALUES_STR + "}]"; + + return "{\"" + deviceA + "\": " + payloadA + ", \"" + deviceB + "\": " + payloadB + "}"; + } + private String getTelemetryValuesUrl(DeviceId deviceId, Set actualKeySet) { return "/api/plugins/telemetry/DEVICE/" + deviceId + "/values/timeseries?startTs=0&endTs=25000&keys=" + String.join(",", actualKeySet); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java index 0590d57034..cabdea145c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java @@ -75,7 +75,6 @@ public class MqttTopics { public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + RPC; public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_REQUEST; public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_RESPONSE; - public static final String GATEWAY_METRICS_TOPIC = BASE_GATEWAY_API_TOPIC + "/metrics"; // v2 topics public static final String BASE_DEVICE_API_TOPIC_V2 = "v2"; public static final String REQUEST_ID_PATTERN = "(?\\d+)"; diff --git a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java index f5c7bbb901..6a61531fec 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java @@ -74,22 +74,34 @@ public class JsonConverter { private static int maxStringValueLength = 0; public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, long ts) throws JsonSyntaxException { + return convertToTelemetryProto(jsonElement, ts, null); + } + + public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, long ts, List metadataResult) throws JsonSyntaxException { PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder(); - convertToTelemetry(jsonElement, ts, null, builder); + convertToTelemetry(jsonElement, ts, null, builder, metadataResult); return builder.build(); } + public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, List metadataResult) throws JsonSyntaxException { + return convertToTelemetryProto(jsonElement, System.currentTimeMillis(), metadataResult); + } + public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement) throws JsonSyntaxException { return convertToTelemetryProto(jsonElement, System.currentTimeMillis()); } - private static void convertToTelemetry(JsonElement jsonElement, long systemTs, Map> result, PostTelemetryMsg.Builder builder) { + private static void convertToTelemetry(JsonElement jsonElement, long systemTs, Map> result, PostTelemetryMsg.Builder builder, List metadataResult) { if (jsonElement.isJsonObject()) { parseObject(systemTs, result, builder, jsonElement.getAsJsonObject()); } else if (jsonElement.isJsonArray()) { jsonElement.getAsJsonArray().forEach(je -> { if (je.isJsonObject()) { - parseObject(systemTs, result, builder, je.getAsJsonObject()); + JsonObject jo = je.getAsJsonObject(); + if (metadataResult != null && jo.has("metadata")) { + metadataResult.add(jo.get("metadata")); + } + parseObject(systemTs, result, builder, jo); } else { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je); } @@ -550,7 +562,7 @@ public class JsonConverter { public static Map> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws JsonSyntaxException { Map> result = sorted ? new TreeMap<>() : new HashMap<>(); - convertToTelemetry(jsonElement, systemTs, result, null); + convertToTelemetry(jsonElement, systemTs, result, null, null); return result; } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 7fad363c6b..d7f64e58ee 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -423,9 +423,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.GATEWAY_DISCONNECT_TOPIC: gatewaySessionHandler.onDeviceDisconnect(mqttMsg); break; - case MqttTopics.GATEWAY_METRICS_TOPIC: - gatewaySessionHandler.onGatewayMetrics(mqttMsg); - break; default: ack(ctx, msgId, MqttReasonCodes.PubAck.TOPIC_NAME_INVALID); } @@ -1199,7 +1196,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null); transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); if (gatewaySessionHandler != null) { - gatewaySessionHandler.onGatewayDisconnect(); + gatewaySessionHandler.onDevicesDisconnect(); } if (sparkplugSessionHandler != null) { // add Msg Telemetry node: key STATE type: String value: OFFLINE ts: sparkplugBProto.getTimestamp() diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayMetricsService.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayMetricsService.java index c044c89e15..18f0047288 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayMetricsService.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayMetricsService.java @@ -30,6 +30,7 @@ import org.thingsboard.server.transport.mqtt.TbMqttTransportComponent; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -41,7 +42,7 @@ public class GatewayMetricsService { public static final String GATEWAY_METRICS = "gatewayMetrics"; - @Value("${transport.mqtt.gateway_metrics_report_interval_sec:3600}") + @Value("${transport.mqtt.gateway_metrics_report_interval_sec:60}") private int metricsReportIntervalSec; @Autowired @@ -57,8 +58,8 @@ public class GatewayMetricsService { scheduler.scheduleAtFixedRate(this::reportMetrics, metricsReportIntervalSec, metricsReportIntervalSec, TimeUnit.SECONDS); } - public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, Map data, long ts) { - states.computeIfAbsent(gatewayId, k -> new GatewayMetricsState(sessionInfo)).update(ts, data); + public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, List data, long serverReceiveTs) { + states.computeIfAbsent(gatewayId, k -> new GatewayMetricsState(sessionInfo)).update(data, serverReceiveTs); } public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId) { @@ -72,26 +73,18 @@ public class GatewayMetricsService { states.remove(deviceId); } - public void onDeviceDisconnect(DeviceId deviceId) { - GatewayMetricsState state = states.remove(deviceId); - if (state != null) { - reportMetrics(state, System.currentTimeMillis()); - } - } - public void reportMetrics() { if (states.isEmpty()) { return; } - Map oldStates = states; + Map statesToReport = states; states = new ConcurrentHashMap<>(); long ts = System.currentTimeMillis(); - oldStates.forEach((gatewayId, state) -> { + statesToReport.forEach((gatewayId, state) -> { reportMetrics(state, ts); }); - oldStates.clear(); } private void reportMetrics(GatewayMetricsState state, long ts) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsData.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsData.java index 23cc730b86..9829a1d373 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsData.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsData.java @@ -15,5 +15,5 @@ */ package org.thingsboard.server.transport.mqtt.gateway.metrics; -public record GatewayMetricsData(long receivedTs, long publishedTs) { +public record GatewayMetricsData(String connector, long receivedTs, long publishedTs) { } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsState.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsState.java index 45f945151b..7bec5da6a7 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsState.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsState.java @@ -19,6 +19,7 @@ import lombok.Getter; import org.thingsboard.server.gen.transport.TransportProtos; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -43,11 +44,11 @@ public class GatewayMetricsState { this.sessionInfo = sessionInfo; } - public void update(long ts, Map metricsData) { + public void update(List metricsData, long serverReceiveTs) { updateLock.lock(); try { - metricsData.forEach((connectorName, data) -> { - connectors.computeIfAbsent(connectorName, k -> new ConnectorMetricsState()).update(ts, data); + metricsData.forEach(data -> { + connectors.computeIfAbsent(data.connector(), k -> new ConnectorMetricsState()).update(data, serverReceiveTs); }); } finally { updateLock.unlock(); @@ -86,7 +87,7 @@ public class GatewayMetricsState { this.transportLatencySum = new AtomicLong(0); } - private void update(long serverReceiveTs, GatewayMetricsData metricsData) { + private void update(GatewayMetricsData metricsData, long serverReceiveTs) { long gwLatency = metricsData.publishedTs() - metricsData.receivedTs(); long transportLatency = serverReceiveTs - metricsData.publishedTs(); count.incrementAndGet(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 3b3aaef983..f1728a2eac 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -62,6 +62,8 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; +import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService; +import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState; import java.util.ArrayList; @@ -114,6 +116,7 @@ public abstract class AbstractGatewaySessionHandler mqttQoSMap; protected final ChannelHandlerContext channel; protected final DeviceSessionCtx deviceSessionCtx; + protected final GatewayMetricsService gatewayMetricsService; @Getter @Setter @@ -131,6 +134,7 @@ public abstract class AbstractGatewaySessionHandler createWeakMap() { @@ -380,7 +384,9 @@ public abstract class AbstractGatewaySessionHandler metadata = new ArrayList<>(); + TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(msg.getAsJsonArray(), metadata); + processTelemetryMetadataMsg(metadata); transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); } catch (Throwable e) { log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); @@ -388,6 +394,20 @@ public abstract class AbstractGatewaySessionHandler metadata) { + var serverReceiveTs = System.currentTimeMillis(); + var metricsData = metadata.stream() + .filter(JsonElement::isJsonObject) + .map(je -> { + var jo = je.getAsJsonObject(); + var connector = jo.get("connector").getAsString(); + var receivedTs = jo.get("receivedTs").getAsLong(); + var publishedTs = jo.get("publishedTs").getAsLong(); + return new GatewayMetricsData(connector, receivedTs, publishedTs); + }).toList(); + gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metricsData, serverReceiveTs); + } + protected void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException { try { TransportApiProtos.GatewayTelemetryMsg telemetryMsgProto = TransportApiProtos.GatewayTelemetryMsg.parseFrom(getBytes(payload)); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 9444787193..5dce8e8893 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -15,36 +15,27 @@ */ package org.thingsboard.server.transport.mqtt.session; -import com.fasterxml.jackson.core.type.TypeReference; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttPublishMessage; -import io.netty.handler.codec.mqtt.MqttReasonCodes; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService; import java.util.Optional; import java.util.UUID; -import static com.amazonaws.util.StringUtils.UTF8; - /** * Created by nickAS21 on 26.12.22 */ @Slf4j public class GatewaySessionHandler extends AbstractGatewaySessionHandler { - private final GatewayMetricsService gatewayMetricsService; - public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) { super(deviceSessionCtx, sessionId, overwriteDevicesActivity); - this.gatewayMetricsService = deviceSessionCtx.getContext().getGatewayMetricsService(); } public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException { @@ -70,11 +61,6 @@ public class GatewaySessionHandler extends AbstractGatewaySessionHandler deviceProfileOpt) { this.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); gatewayMetricsService.onDeviceUpdate(sessionInfo, gateway.getDeviceId()); @@ -84,21 +70,4 @@ public class GatewaySessionHandler extends AbstractGatewaySessionHandler() {}), ts); - ack(msgId, MqttReasonCodes.PubAck.SUCCESS); - } catch (Throwable t) { - ackOrClose(msgId); - } - } - } diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index d2ac105623..c9fd10a99d 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -147,7 +147,7 @@ transport: disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}" msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before device connected state. This limit works on low level before TenantProfileLimits mechanism # Interval of periodic report of the gateway metrics - gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:3600}" + gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:60}" netty: # Netty leak detector level leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"