From 1d49e56aec93e27d0c85fa42dd72870c228e5af7 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Thu, 26 Jan 2023 19:06:42 +0200 Subject: [PATCH] sparkplug: add Metrics birth --- .../transport/mqtt/MqttTransportHandler.java | 18 ++++++++----- .../MqttDeviceAwareSessionContext.java | 12 +++++++++ .../session/SparkplugNodeSessionHandler.java | 26 ++++++++++++++----- 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 2a4dcbae16..2b6e68258d 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -384,6 +384,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement String deviceName = sparkplugTopic.isNode() ? deviceSessionCtx.getDeviceInfo().getDeviceName() : sparkplugTopic.getDeviceId(); if (sparkplugTopic.isNode()) { // A node topic + SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsg.payload())); switch (sparkplugTopic.getType()) { case STATE: // TODO @@ -391,8 +392,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case NBIRTH: case NCMD: case NDATA: - SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsg.payload())); - sparkplugSessionHandler.onDeviceTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic.getType().name(), sparkplugTopic.isNode()); + sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); break; case NDEATH: sparkplugSessionHandler.onDeviceDisconnect(mqttMsg); @@ -404,15 +404,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } else { // A device topic + SparkplugBProto.Payload sparkplugBProtoDevice = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsg.payload())); switch (sparkplugTopic.getType()) { case STATE: // TODO break; + case DBIRTH: + + sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic); + System.out.println(); + break; case DCMD: case DDATA: - case DBIRTH: - SparkplugBProto.Payload sparkplugBProtoDevice = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsg.payload())); - sparkplugSessionHandler.onDeviceTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic.getType().name(), sparkplugTopic.isNode()); + sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic); break; case DDEATH: sparkplugSessionHandler.onDeviceDisconnect(mqttMsg); @@ -1064,8 +1068,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement && connectMessage.payload().willMessageInBytes() != null && connectMessage.payload().willMessageInBytes().length > 0) { SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); SparkplugTopic sparkplugTopic = parseTopicPublish(connectMessage.payload().willTopic()); - sparkplugSessionHandler.onDeviceTelemetryProto(0, sparkplugBProtoNode, - deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopic.getType().name(), true); + sparkplugSessionHandler.onTelemetryProto(0, sparkplugBProtoNode, + deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopic); } } } catch (Exception e) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java index 8c8437be51..ded99daf60 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java @@ -17,9 +17,12 @@ package org.thingsboard.server.transport.mqtt.session; import io.netty.handler.codec.mqtt.MqttQoS; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; +import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @@ -30,6 +33,7 @@ import java.util.stream.Collectors; public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext { private final ConcurrentMap mqttQoSMap; + private final Set metricBirth = new HashSet<>(); public MqttDeviceAwareSessionContext(UUID sessionId, ConcurrentMap mqttQoSMap) { super(sessionId); @@ -40,6 +44,14 @@ public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionCo return mqttQoSMap; } + public Set getMetricBirth() { + return metricBirth; + } + + public void setMetricBirth(java.util.List metrics) { + this.metricBirth.addAll(metrics); + } + public MqttQoS getQoSForTopic(String topic) { List qosList = mqttQoSMap.entrySet() .stream() diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index 9c0351b189..9ebc43e0d6 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -39,8 +39,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutionException; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NBIRTH; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getFromSparkplugBMetricToKeyValueProto; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicSubscribe; @@ -67,13 +69,25 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } } - public void onDeviceTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, String topicTypeName, boolean isNode) throws AdaptorException { - try { - checkDeviceName(deviceName); - List msgs = convertToPostTelemetry(sparkplugBProto, topicTypeName); - int finalMsgId = msgId; - ListenableFuture contextListenableFuture = isNode ? + public void onTelemetryProto (int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException { + checkDeviceName(deviceName); + ListenableFuture contextListenableFuture = topic.isNode() ? Futures.immediateFuture(this.deviceSessionCtx) : checkDeviceConnected(deviceName); + List msgs = convertToPostTelemetry(sparkplugBProto, topic.getType().name()); + if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) { + try { + contextListenableFuture.get().setMetricBirth(sparkplugBProto.getMetricsList()); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed add Metrics. MessageType *BIRTH.", e); + } + } + onDeviceTelemetryProto(contextListenableFuture, msgId, msgs, deviceName); + } + + public void onDeviceTelemetryProto(ListenableFuture contextListenableFuture, + int msgId, List msgs, String deviceName) throws AdaptorException { + try { + int finalMsgId = msgId; for (TransportProtos.PostTelemetryMsg msg : msgs) { Futures.addCallback(contextListenableFuture, new FutureCallback<>() {