added one more filtering for metrics data

This commit is contained in:
YevhenBondarenko 2024-09-13 10:02:06 +02:00
parent 137b1732bb
commit 55b6b8c564

View File

@ -398,15 +398,20 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
var serverReceiveTs = System.currentTimeMillis(); var serverReceiveTs = System.currentTimeMillis();
var metricsData = metadata.stream() var metricsData = metadata.stream()
.filter(JsonElement::isJsonObject) .filter(JsonElement::isJsonObject)
.map(je -> { .map(JsonElement::getAsJsonObject)
var jo = je.getAsJsonObject(); .filter(jo -> jo.has("connector")
&& jo.has("receivedTs")
&& jo.has("publishedTs"))
.map(jo -> {
var connector = jo.get("connector").getAsString(); var connector = jo.get("connector").getAsString();
var receivedTs = jo.get("receivedTs").getAsLong(); var receivedTs = jo.get("receivedTs").getAsLong();
var publishedTs = jo.get("publishedTs").getAsLong(); var publishedTs = jo.get("publishedTs").getAsLong();
return new GatewayMetricsData(connector, receivedTs, publishedTs); return new GatewayMetricsData(connector, receivedTs, publishedTs);
}).toList(); }).toList();
if (!metadata.isEmpty()) {
gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metricsData, serverReceiveTs); gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metricsData, serverReceiveTs);
} }
}
protected void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException { protected void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException {
try { try {