sparkplug: add Metrics birth

This commit is contained in:
nickAS21 2023-01-26 19:06:42 +02:00
parent f9bf73f896
commit 1d49e56aec
3 changed files with 43 additions and 13 deletions

View File

@ -384,6 +384,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
String deviceName = sparkplugTopic.isNode() ? deviceSessionCtx.getDeviceInfo().getDeviceName() : sparkplugTopic.getDeviceId(); String deviceName = sparkplugTopic.isNode() ? deviceSessionCtx.getDeviceInfo().getDeviceName() : sparkplugTopic.getDeviceId();
if (sparkplugTopic.isNode()) { if (sparkplugTopic.isNode()) {
// A node topic // A node topic
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsg.payload()));
switch (sparkplugTopic.getType()) { switch (sparkplugTopic.getType()) {
case STATE: case STATE:
// TODO // TODO
@ -391,8 +392,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case NBIRTH: case NBIRTH:
case NCMD: case NCMD:
case NDATA: case NDATA:
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsg.payload())); sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
sparkplugSessionHandler.onDeviceTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic.getType().name(), sparkplugTopic.isNode());
break; break;
case NDEATH: case NDEATH:
sparkplugSessionHandler.onDeviceDisconnect(mqttMsg); sparkplugSessionHandler.onDeviceDisconnect(mqttMsg);
@ -404,15 +404,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
} else { } else {
// A device topic // A device topic
SparkplugBProto.Payload sparkplugBProtoDevice = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsg.payload()));
switch (sparkplugTopic.getType()) { switch (sparkplugTopic.getType()) {
case STATE: case STATE:
// TODO // TODO
break; break;
case DBIRTH:
sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic);
System.out.println();
break;
case DCMD: case DCMD:
case DDATA: case DDATA:
case DBIRTH: sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic);
SparkplugBProto.Payload sparkplugBProtoDevice = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsg.payload()));
sparkplugSessionHandler.onDeviceTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic.getType().name(), sparkplugTopic.isNode());
break; break;
case DDEATH: case DDEATH:
sparkplugSessionHandler.onDeviceDisconnect(mqttMsg); sparkplugSessionHandler.onDeviceDisconnect(mqttMsg);
@ -1064,8 +1068,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
&& connectMessage.payload().willMessageInBytes() != null && connectMessage.payload().willMessageInBytes().length > 0) { && connectMessage.payload().willMessageInBytes() != null && connectMessage.payload().willMessageInBytes().length > 0) {
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
SparkplugTopic sparkplugTopic = parseTopicPublish(connectMessage.payload().willTopic()); SparkplugTopic sparkplugTopic = parseTopicPublish(connectMessage.payload().willTopic());
sparkplugSessionHandler.onDeviceTelemetryProto(0, sparkplugBProtoNode, sparkplugSessionHandler.onTelemetryProto(0, sparkplugBProtoNode,
deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopic.getType().name(), true); deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopic);
} }
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -17,9 +17,12 @@ package org.thingsboard.server.transport.mqtt.session;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -30,6 +33,7 @@ import java.util.stream.Collectors;
public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext { public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext {
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap; private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
private final Set<SparkplugBProto.Payload.Metric> metricBirth = new HashSet<>();
public MqttDeviceAwareSessionContext(UUID sessionId, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) { public MqttDeviceAwareSessionContext(UUID sessionId, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
super(sessionId); super(sessionId);
@ -40,6 +44,14 @@ public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionCo
return mqttQoSMap; return mqttQoSMap;
} }
public Set<SparkplugBProto.Payload.Metric> getMetricBirth() {
return metricBirth;
}
public void setMetricBirth(java.util.List<org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric> metrics) {
this.metricBirth.addAll(metrics);
}
public MqttQoS getQoSForTopic(String topic) { public MqttQoS getQoSForTopic(String topic) {
List<Integer> qosList = mqttQoSMap.entrySet() List<Integer> qosList = mqttQoSMap.entrySet()
.stream() .stream()

View File

@ -39,8 +39,10 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NBIRTH;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getFromSparkplugBMetricToKeyValueProto; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getFromSparkplugBMetricToKeyValueProto;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicSubscribe; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicSubscribe;
@ -67,13 +69,25 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
} }
} }
public void onDeviceTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, String topicTypeName, boolean isNode) throws AdaptorException { public void onTelemetryProto (int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException {
try { checkDeviceName(deviceName);
checkDeviceName(deviceName); ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture = topic.isNode() ?
List<TransportProtos.PostTelemetryMsg> msgs = convertToPostTelemetry(sparkplugBProto, topicTypeName);
int finalMsgId = msgId;
ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture = isNode ?
Futures.immediateFuture(this.deviceSessionCtx) : checkDeviceConnected(deviceName); Futures.immediateFuture(this.deviceSessionCtx) : checkDeviceConnected(deviceName);
List<TransportProtos.PostTelemetryMsg> msgs = convertToPostTelemetry(sparkplugBProto, topic.getType().name());
if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) {
try {
contextListenableFuture.get().setMetricBirth(sparkplugBProto.getMetricsList());
} catch (InterruptedException | ExecutionException e) {
log.error("Failed add Metrics. MessageType *BIRTH.", e);
}
}
onDeviceTelemetryProto(contextListenableFuture, msgId, msgs, deviceName);
}
public void onDeviceTelemetryProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture,
int msgId, List<TransportProtos.PostTelemetryMsg> msgs, String deviceName) throws AdaptorException {
try {
int finalMsgId = msgId;
for (TransportProtos.PostTelemetryMsg msg : msgs) { for (TransportProtos.PostTelemetryMsg msg : msgs) {
Futures.addCallback(contextListenableFuture, Futures.addCallback(contextListenableFuture,
new FutureCallback<>() { new FutureCallback<>() {