From 55b6b8c5645e3fd9748ecae83f9b4800dc59baf3 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 13 Sep 2024 10:02:06 +0200 Subject: [PATCH] added one more filtering for metrics data --- .../mqtt/session/AbstractGatewaySessionHandler.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index f1728a2eac..f963a78135 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -398,14 +398,19 @@ public abstract class AbstractGatewaySessionHandler { - var jo = je.getAsJsonObject(); + .map(JsonElement::getAsJsonObject) + .filter(jo -> jo.has("connector") + && jo.has("receivedTs") + && jo.has("publishedTs")) + .map(jo -> { var connector = jo.get("connector").getAsString(); var receivedTs = jo.get("receivedTs").getAsLong(); var publishedTs = jo.get("publishedTs").getAsLong(); return new GatewayMetricsData(connector, receivedTs, publishedTs); }).toList(); - gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metricsData, serverReceiveTs); + if (!metadata.isEmpty()) { + gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metricsData, serverReceiveTs); + } } protected void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException {