diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index e58e2c5d89..69f5f78475 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -399,7 +399,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case NBIRTH: case NCMD: case NDATA: - sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopic); + sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, sparkplugTopic); break; default: } @@ -410,7 +410,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case DBIRTH: case DCMD: case DDATA: - sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoDevice, sparkplugTopic.getDeviceId(), sparkplugTopic); + sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoDevice, sparkplugTopic); break; case DDEATH: sparkplugSessionHandler.onDeviceDisconnect(mqttMsg, sparkplugTopic.getDeviceId()); @@ -1067,8 +1067,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (sparkplugTopicNode != null) { SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, sparkplugTopicNode); - sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, - deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopicNode); + sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopicNode); sessionMetaData.setOverwriteActivityTime(true); } else { log.trace("[{}][{}] Failed to fetch sparkplugDevice connect: sparkplugTopicName without SparkplugMessageType.NDEATH.", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName()); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index fc480fa882..9b1e3e664e 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -101,7 +101,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler contextListenableFuture; @@ -113,7 +114,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler deviceCtx = onDeviceConnectProto(deviceName); + ListenableFuture deviceCtx = onDeviceConnectProto(topic); contextListenableFuture = Futures.transform(deviceCtx, ctx -> { if (topic.isType(DBIRTH)) { sendSparkplugStateOnTelemetry(ctx.getSessionInfo(), deviceName, ONLINE, @@ -218,10 +219,10 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler onDeviceConnectProto(String deviceName) throws ThingsboardException { + private ListenableFuture onDeviceConnectProto(SparkplugTopic topic) throws ThingsboardException { try { String deviceType = this.gateway.getDeviceType() + "-node"; - return onDeviceConnect(deviceName, deviceType); + return onDeviceConnect(topic.getNodeDeviceName(), deviceType); } catch (RuntimeException e) { log.error("Failed Sparkplug Device connect proto!", e); throw new ThingsboardException(e, ThingsboardErrorCode.BAD_REQUEST_PARAMS); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java index be721f11bb..2fc49e2b69 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java @@ -161,4 +161,9 @@ public class SparkplugTopic { public boolean isNode() { return this.deviceId == null; } + + public String getNodeDeviceName() { + return isNode() ? edgeNodeId : deviceId; + } } +