From a01c7a23d72a828e315a6fb13dfc9427e8bbcb84 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Fri, 27 Jan 2023 18:53:35 +0200 Subject: [PATCH] sparkplug: add onAttributesUpdate --- .../transport/mqtt/MqttTransportHandler.java | 17 ++++++++------ .../AbstractGatewayDeviceSessionContext.java | 2 +- .../AbstractGatewaySessionHandler.java | 7 +++++- .../session/SparkplugNodeSessionHandler.java | 5 ++++- .../mqtt/session/SparkplugSessionCtx.java | 22 +++++++++++++++++++ 5 files changed, 43 insertions(+), 10 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 2b6e68258d..747c26e6fd 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1062,21 +1062,24 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void checkSparkplugSession(MqttConnectMessage connectMessage) { try { - if (sparkplugSessionHandler == null) { - sparkplugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId); - if (StringUtils.isNotBlank(connectMessage.payload().willTopic()) - && connectMessage.payload().willMessageInBytes() != null && connectMessage.payload().willMessageInBytes().length > 0) { + if (sparkplugSessionHandler == null && validatedSparkplugTopic(connectMessage)) { SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); - SparkplugTopic sparkplugTopic = parseTopicPublish(connectMessage.payload().willTopic()); + SparkplugTopic sparkplugTopicNode = parseTopicPublish(connectMessage.payload().willTopic()); + sparkplugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopicNode); sparkplugSessionHandler.onTelemetryProto(0, sparkplugBProtoNode, - deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopic); - } + deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopicNode); } } catch (Exception e) { log.trace("[{}][{}] Failed to fetch sparkplugDevice additional info or sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e); } } + private boolean validatedSparkplugTopic (MqttConnectMessage connectMessage) { + return StringUtils.isNotBlank(connectMessage.payload().willTopic()) + && connectMessage.payload().willMessageInBytes() != null + && connectMessage.payload().willMessageInBytes().length > 0; + } + @Override public void operationComplete(Future future) throws Exception { log.trace("[{}] Channel closed!", sessionId); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java index 695873f677..ab3a1f5e4f 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java @@ -37,7 +37,7 @@ import java.util.concurrent.ConcurrentMap; @Slf4j public abstract class AbstractGatewayDeviceSessionContext extends MqttDeviceAwareSessionContext implements SessionMsgListener { - private final AbstractGatewaySessionHandler parent; + protected final AbstractGatewaySessionHandler parent; private final TransportService transportService; public AbstractGatewayDeviceSessionContext(AbstractGatewaySessionHandler parent, TransportDeviceInfo deviceInfo, diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 9ea5cbb00f..bf0d9230db 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -252,7 +252,7 @@ public abstract class AbstractGatewaySessionHandler { new TransportServiceCallback<>() { @Override public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { - GatewayDeviceSessionContext deviceSessionCtx = new GatewayDeviceSessionContext(AbstractGatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); + AbstractGatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg) ; if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); @@ -282,6 +282,11 @@ public abstract class AbstractGatewaySessionHandler { } } + private AbstractGatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg) { + return this.deviceSessionCtx.isSparkplug() ? new SparkplugSessionCtx(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService) : + new GatewayDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); + } + protected int getMsgId(MqttPublishMessage mqttMsg) { return mqttMsg.variableHeader().packetId(); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index 9ebc43e0d6..d36d6ba2f4 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -52,8 +52,11 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopi @Slf4j public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { - public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) { + final SparkplugTopic sparkplugTopicNode; + + public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, SparkplugTopic sparkplugTopicNode) { super(deviceSessionCtx, sessionId); + this.sparkplugTopicNode = sparkplugTopicNode; } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java index 2b02571b4e..9a87dbfe84 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java @@ -15,11 +15,15 @@ */ package org.thingsboard.server.transport.mqtt.session; +import io.netty.handler.codec.mqtt.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; +import org.thingsboard.server.gen.transport.TransportProtos; +import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ConcurrentMap; /** @@ -36,4 +40,22 @@ public class SparkplugSessionCtx extends AbstractGatewayDeviceSessionContext { super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService); } + @Override + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) { + log.trace("[{}] Received attributes update notification to sparkplug device", sessionId); + createMqttPublishMsg(this, notification).ifPresent(parent::writeAndFlush); + } + + private Optional createMqttPublishMsg (MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notification) { + try { + // TODO metrics from notification & MetricsDBIRTH + byte[] payloadInBytes = new byte[3]; + String topic = ((SparkplugNodeSessionHandler) parent).sparkplugTopicNode.toString() + "/" + deviceInfo.getDeviceName(); + return Optional.of(parent.getPayloadAdaptor().createMqttPublishMsg(ctx, topic, payloadInBytes)); + } catch (Exception e) { + log.trace("[{}] Failed to convert device attributes response to MQTT sparkplug msg", sessionId, e); + return Optional.empty(); + } + } + }