From 48b5cadb4a61f2cb6a3c10c656a67d420745ec67 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 13 Sep 2024 15:30:37 +0200 Subject: [PATCH] created separate converter methow for gateway telemetry --- ...AbstractMqttTimeseriesIntegrationTest.java | 6 +- .../msg/gateway/metrics/GatewayMetadata.java} | 4 +- .../server/common/adaptor/JsonConverter.java | 62 ++++++++++++++----- .../mqtt/gateway/GatewayMetricsService.java | 4 +- .../gateway/metrics/GatewayMetricsState.java | 5 +- .../AbstractGatewaySessionHandler.java | 32 +++------- 6 files changed, 67 insertions(+), 46 deletions(-) rename common/{transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsData.java => message/src/main/java/org/thingsboard/server/common/msg/gateway/metrics/GatewayMetadata.java} (80%) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java index f5e21e9e18..5fd68417dd 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java @@ -28,7 +28,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService; -import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData; +import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; @@ -176,10 +176,10 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt for (int i = 0; i < n; i++) { long publishedTs = System.currentTimeMillis() - 10; long gatewayLatencyA = random.nextLong(100, 500); - var firstData = new GatewayMetricsData("connectorA", publishedTs - gatewayLatencyA, publishedTs); + var firstData = new GatewayMetadata("connectorA", publishedTs - gatewayLatencyA, publishedTs); gwLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(gatewayLatencyA); long gatewayLatencyB = random.nextLong(120, 450); - var secondData = new GatewayMetricsData("connectorB", publishedTs - gatewayLatencyB, publishedTs); + var secondData = new GatewayMetadata("connectorB", publishedTs - gatewayLatencyB, publishedTs); gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB); List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsData.java b/common/message/src/main/java/org/thingsboard/server/common/msg/gateway/metrics/GatewayMetadata.java similarity index 80% rename from common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsData.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/gateway/metrics/GatewayMetadata.java index 9829a1d373..cc905f9383 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsData.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/gateway/metrics/GatewayMetadata.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.transport.mqtt.gateway.metrics; +package org.thingsboard.server.common.msg.gateway.metrics; -public record GatewayMetricsData(String connector, long receivedTs, long publishedTs) { +public record GatewayMetadata(String connector, long receivedTs, long publishedTs) { } diff --git a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java index 6a61531fec..bffe59fa83 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java @@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.util.TbPair; +import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; @@ -74,33 +76,65 @@ public class JsonConverter { private static int maxStringValueLength = 0; public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, long ts) throws JsonSyntaxException { - return convertToTelemetryProto(jsonElement, ts, null); - } - - public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, long ts, List metadataResult) throws JsonSyntaxException { PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder(); - convertToTelemetry(jsonElement, ts, null, builder, metadataResult); + convertToTelemetry(jsonElement, ts, null, builder); return builder.build(); } - public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, List metadataResult) throws JsonSyntaxException { - return convertToTelemetryProto(jsonElement, System.currentTimeMillis(), metadataResult); - } - public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement) throws JsonSyntaxException { return convertToTelemetryProto(jsonElement, System.currentTimeMillis()); } - private static void convertToTelemetry(JsonElement jsonElement, long systemTs, Map> result, PostTelemetryMsg.Builder builder, List metadataResult) { + public static TbPair> convertToGatewayTelemetry(JsonElement jsonElement, long systemTs) { + List metadataResult = null; + PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder(); + if (jsonElement.isJsonArray()) { + var ja = jsonElement.getAsJsonArray(); + for (int i = 0; i < ja.size(); i++) { + var je = ja.get(i); + if (je.isJsonObject()) { + JsonObject jo = je.getAsJsonObject(); + JsonElement metadataElem = jo.remove("metadata"); + if (metadataElem != null) { + if (metadataResult == null) { + metadataResult = new ArrayList<>(); + } + if (metadataElem.isJsonObject()) { + JsonObject metadataObj = metadataElem.getAsJsonObject(); + var connector = getAndValidateMetadataElement(metadataObj, "connector").getAsString(); + var receivedTs = getAndValidateMetadataElement(metadataObj, "receivedTs").getAsLong(); + var publishedTs = getAndValidateMetadataElement(metadataObj, "publishedTs").getAsLong(); + metadataResult.add(new GatewayMetadata(connector, receivedTs, publishedTs)); + } else { + throw new JsonSyntaxException("Can't parse gateway metadata: " + metadataElem); + } + } + parseObject(systemTs, null, builder, jo); + } else { + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je); + } + } + } else { + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonElement); + } + return TbPair.of(builder.build(), metadataResult); + } + + private static JsonElement getAndValidateMetadataElement(JsonObject metadata, String elementName) { + var element = metadata.get(elementName); + if (element == null || element.isJsonNull()) { + throw new JsonSyntaxException(String.format("Can't parse gateway element in metadata: [%s][%s]", metadata, elementName)); + } + return element; + } + + private static void convertToTelemetry(JsonElement jsonElement, long systemTs, Map> result, PostTelemetryMsg.Builder builder) { if (jsonElement.isJsonObject()) { parseObject(systemTs, result, builder, jsonElement.getAsJsonObject()); } else if (jsonElement.isJsonArray()) { jsonElement.getAsJsonArray().forEach(je -> { if (je.isJsonObject()) { JsonObject jo = je.getAsJsonObject(); - if (metadataResult != null && jo.has("metadata")) { - metadataResult.add(jo.get("metadata")); - } parseObject(systemTs, result, builder, jo); } else { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je); @@ -562,7 +596,7 @@ public class JsonConverter { public static Map> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws JsonSyntaxException { Map> result = sorted ? new TreeMap<>() : new HashMap<>(); - convertToTelemetry(jsonElement, systemTs, result, null, null); + convertToTelemetry(jsonElement, systemTs, result, null); return result; } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayMetricsService.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayMetricsService.java index 18f0047288..4f67866d06 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayMetricsService.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/GatewayMetricsService.java @@ -27,7 +27,7 @@ import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.transport.mqtt.TbMqttTransportComponent; -import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData; +import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState; import java.util.List; @@ -58,7 +58,7 @@ public class GatewayMetricsService { scheduler.scheduleAtFixedRate(this::reportMetrics, metricsReportIntervalSec, metricsReportIntervalSec, TimeUnit.SECONDS); } - public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, List data, long serverReceiveTs) { + public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, List data, long serverReceiveTs) { states.computeIfAbsent(gatewayId, k -> new GatewayMetricsState(sessionInfo)).update(data, serverReceiveTs); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsState.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsState.java index 7bec5da6a7..a800c78e28 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsState.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/gateway/metrics/GatewayMetricsState.java @@ -16,6 +16,7 @@ package org.thingsboard.server.transport.mqtt.gateway.metrics; import lombok.Getter; +import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata; import org.thingsboard.server.gen.transport.TransportProtos; import java.util.HashMap; @@ -44,7 +45,7 @@ public class GatewayMetricsState { this.sessionInfo = sessionInfo; } - public void update(List metricsData, long serverReceiveTs) { + public void update(List metricsData, long serverReceiveTs) { updateLock.lock(); try { metricsData.forEach(data -> { @@ -87,7 +88,7 @@ public class GatewayMetricsState { this.transportLatencySum = new AtomicLong(0); } - private void update(GatewayMetricsData metricsData, long serverReceiveTs) { + private void update(GatewayMetadata metricsData, long serverReceiveTs) { long gwLatency = metricsData.publishedTs() - metricsData.receivedTs(); long transportLatency = serverReceiveTs - metricsData.publishedTs(); count.incrementAndGet(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 483522f85c..2e205ff840 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -48,6 +48,8 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.util.TbPair; +import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata; import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; @@ -63,7 +65,6 @@ import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService; -import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState; import java.util.ArrayList; @@ -384,9 +385,13 @@ public abstract class AbstractGatewaySessionHandler metadata = new ArrayList<>(); - TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(msg.getAsJsonArray(), metadata); - processTelemetryMetadataMsg(metadata); + long systemTs = System.currentTimeMillis(); + TbPair> gatewayPayloadPair = JsonConverter.convertToGatewayTelemetry(msg.getAsJsonArray(), systemTs); + TransportProtos.PostTelemetryMsg postTelemetryMsg = gatewayPayloadPair.getFirst(); + List metadata = gatewayPayloadPair.getSecond(); + if (!CollectionUtils.isEmpty(metadata)) { + gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metadata, systemTs); + } transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); } catch (Throwable e) { log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); @@ -394,25 +399,6 @@ public abstract class AbstractGatewaySessionHandler metadata) { - var serverReceiveTs = System.currentTimeMillis(); - var metricsData = metadata.stream() - .filter(JsonElement::isJsonObject) - .map(JsonElement::getAsJsonObject) - .filter(jo -> jo.has("connector") - && jo.has("receivedTs") - && jo.has("publishedTs")) - .map(jo -> { - var connector = jo.get("connector").getAsString(); - var receivedTs = jo.get("receivedTs").getAsLong(); - var publishedTs = jo.get("publishedTs").getAsLong(); - return new GatewayMetricsData(connector, receivedTs, publishedTs); - }).toList(); - if (!metricsData.isEmpty()) { - gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metricsData, serverReceiveTs); - } - } - protected void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException { try { TransportApiProtos.GatewayTelemetryMsg telemetryMsgProto = TransportApiProtos.GatewayTelemetryMsg.parseFrom(getBytes(payload));