sparkplug: add getNodeDeviceName()

This commit is contained in:
nickAS21 2023-03-20 17:17:51 +02:00
parent c6dbdeb23d
commit d8e97dd74d
3 changed files with 13 additions and 8 deletions

View File

@ -399,7 +399,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case NBIRTH:
case NCMD:
case NDATA:
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopic);
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, sparkplugTopic);
break;
default:
}
@ -410,7 +410,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case DBIRTH:
case DCMD:
case DDATA:
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoDevice, sparkplugTopic.getDeviceId(), sparkplugTopic);
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoDevice, sparkplugTopic);
break;
case DDEATH:
sparkplugSessionHandler.onDeviceDisconnect(mqttMsg, sparkplugTopic.getDeviceId());
@ -1067,8 +1067,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (sparkplugTopicNode != null) {
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, sparkplugTopicNode);
sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode,
deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopicNode);
sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopicNode);
sessionMetaData.setOverwriteActivityTime(true);
} else {
log.trace("[{}][{}] Failed to fetch sparkplugDevice connect: sparkplugTopicName without SparkplugMessageType.NDEATH.", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName());

View File

@ -101,7 +101,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
}
}
public void onAttributesTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException {
public void onAttributesTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, SparkplugTopic topic) throws AdaptorException, ThingsboardException {
String deviceName = topic.getNodeDeviceName();
checkDeviceName(deviceName);
ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture;
@ -113,7 +114,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
}
contextListenableFuture = Futures.immediateFuture(this.deviceSessionCtx);
} else {
ListenableFuture<SparkplugDeviceSessionContext> deviceCtx = onDeviceConnectProto(deviceName);
ListenableFuture<SparkplugDeviceSessionContext> deviceCtx = onDeviceConnectProto(topic);
contextListenableFuture = Futures.transform(deviceCtx, ctx -> {
if (topic.isType(DBIRTH)) {
sendSparkplugStateOnTelemetry(ctx.getSessionInfo(), deviceName, ONLINE,
@ -218,10 +219,10 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
}
}
private ListenableFuture<SparkplugDeviceSessionContext> onDeviceConnectProto(String deviceName) throws ThingsboardException {
private ListenableFuture<SparkplugDeviceSessionContext> onDeviceConnectProto(SparkplugTopic topic) throws ThingsboardException {
try {
String deviceType = this.gateway.getDeviceType() + "-node";
return onDeviceConnect(deviceName, deviceType);
return onDeviceConnect(topic.getNodeDeviceName(), deviceType);
} catch (RuntimeException e) {
log.error("Failed Sparkplug Device connect proto!", e);
throw new ThingsboardException(e, ThingsboardErrorCode.BAD_REQUEST_PARAMS);

View File

@ -161,4 +161,9 @@ public class SparkplugTopic {
public boolean isNode() {
return this.deviceId == null;
}
public String getNodeDeviceName() {
return isNode() ? edgeNodeId : deviceId;
}
}