diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index eb1c688950..2953183027 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -128,7 +128,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private final UUID sessionId; protected final MqttTransportContext context; - public final TransportService transportService; + private final TransportService transportService; private final SchedulerComponent scheduler; private final SslHandler sslHandler; private final ConcurrentMap mqttQoSMap; @@ -142,7 +142,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private final ConcurrentHashMap chunkSizes; private final ConcurrentMap rpcAwaitingAck; - public TopicType attrSubTopicType; + private TopicType attrSubTopicType; private TopicType rpcSubTopicType; private TopicType attrReqTopicType; private TopicType toServerRpcSubTopicType; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java index ab3a1f5e4f..9c133c2457 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewayDeviceSessionContext.java @@ -35,12 +35,12 @@ import java.util.concurrent.ConcurrentMap; * Created by ashvayka on 19.01.17. */ @Slf4j -public abstract class AbstractGatewayDeviceSessionContext extends MqttDeviceAwareSessionContext implements SessionMsgListener { +public abstract class AbstractGatewayDeviceSessionContext extends MqttDeviceAwareSessionContext implements SessionMsgListener { - protected final AbstractGatewaySessionHandler parent; + protected final T parent; private final TransportService transportService; - public AbstractGatewayDeviceSessionContext(AbstractGatewaySessionHandler parent, TransportDeviceInfo deviceInfo, + public AbstractGatewayDeviceSessionContext(T parent, TransportDeviceInfo deviceInfo, DeviceProfile deviceProfile, ConcurrentMap mqttQoSMap, TransportService transportService) { super(UUID.randomUUID(), mqttQoSMap); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index a6361978b3..66c5823f11 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -84,13 +84,13 @@ public abstract class AbstractGatewaySessionHandler { private static final String DEVICE_PROPERTY = "device"; protected final MqttTransportContext context; - private final TransportService transportService; + protected final TransportService transportService; protected final TransportDeviceInfo gateway; protected final UUID sessionId; private final ConcurrentMap deviceCreationLockMap; private final ConcurrentMap devices; private final ConcurrentMap> deviceFutures; - private final ConcurrentMap mqttQoSMap; + protected final ConcurrentMap mqttQoSMap; protected final ChannelHandlerContext channel; protected final DeviceSessionCtx deviceSessionCtx; @@ -252,7 +252,7 @@ public abstract class AbstractGatewaySessionHandler { new TransportServiceCallback<>() { @Override public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { - GatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg) ; + AbstractGatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg); if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); @@ -282,10 +282,7 @@ public abstract class AbstractGatewaySessionHandler { } } - private GatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg) { - return this.deviceSessionCtx.isSparkplug() ? new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService) : - new GatewayDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); - } + protected abstract AbstractGatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg); protected int getMsgId(MqttPublishMessage mqttMsg) { return mqttMsg.variableHeader().packetId(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionContext.java index 4c3a20adda..045723555c 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionContext.java @@ -24,9 +24,9 @@ import java.util.concurrent.ConcurrentMap; /** * Created by nickAS21 on 26.12.22 */ -public class GatewayDeviceSessionContext extends AbstractGatewayDeviceSessionContext{ +public class GatewayDeviceSessionContext extends AbstractGatewayDeviceSessionContext { - public GatewayDeviceSessionContext(AbstractGatewaySessionHandler parent, + public GatewayDeviceSessionContext(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo, DeviceProfile deviceProfile, ConcurrentMap mqttQoSMap, diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 9d1834f358..c1c82efd15 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -18,6 +18,7 @@ package org.thingsboard.server.transport.mqtt.session; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttPublishMessage; import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import java.util.UUID; @@ -48,4 +49,9 @@ public class GatewaySessionHandler extends AbstractGatewaySessionHandler { } } + @Override + protected GatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg) { + return new GatewayDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); + } + } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java index 4416501dea..06a398b9c1 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java @@ -27,13 +27,13 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; @Slf4j -public class SparkplugDeviceSessionContext extends GatewayDeviceSessionContext{ +public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionContext { - public SparkplugDeviceSessionContext(AbstractGatewaySessionHandler parent, + public SparkplugDeviceSessionContext(SparkplugNodeSessionHandler parent, TransportDeviceInfo deviceInfo, DeviceProfile deviceProfile, ConcurrentMap mqttQoSMap, + Integer> mqttQoSMap, TransportService transportService) { super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService); } @@ -43,12 +43,12 @@ public class SparkplugDeviceSessionContext extends GatewayDeviceSessionContext{ log.trace("[{}] Received attributes update notification to sparkplug device", sessionId); notification.getSharedUpdatedList().forEach(tsKvProto -> { if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { - SparkplugTopic sparkplugTopic = new SparkplugTopic(((SparkplugNodeSessionHandler)parent).getSparkplugTopicNode(), + SparkplugTopic sparkplugTopic = new SparkplugTopic(parent.getSparkplugTopicNode(), SparkplugMessageType.DCMD, deviceInfo.getDeviceName()); - ((SparkplugNodeSessionHandler)parent).createSparkplugMqttPublishMsg(tsKvProto, + parent.createSparkplugMqttPublishMsg(tsKvProto, sparkplugTopic.toString(), getDeviceBirthMetrics().get(tsKvProto.getKv().getKey())) - .ifPresent(parent::writeAndFlush); + .ifPresent(this.parent::writeAndFlush); } }); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index a43e64be0d..ef10c3234e 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.adaptor.ProtoConverter; +import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; @@ -149,22 +150,11 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { // SUBSCRIBE Node parent.processAttributesSubscribe(grantedQoSList, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, reqQoS, TopicType.V1); } else { - // SUBSCRIBE Device - onSparkplugDeviceSubscribe(grantedQoSList, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, reqQoS, TopicType.V1, sparkplugTopic.getDeviceId()); + // SUBSCRIBE Device - DO NOTHING, WE HAVE ALREADY SUBSCRIBED. + // TODO: track that node subscribed to # or to particular device. } } - public void onSparkplugDeviceSubscribe(List grantedQoSList, String topic, - MqttQoS reqQoS, TopicType topicType, String deviceName) - throws AdaptorException, ThingsboardException, ExecutionException, InterruptedException { - checkDeviceName(deviceName); - ListenableFuture contextListenableFuture = onDeviceConnectProto(deviceName); - parent.transportService.process(contextListenableFuture.get().getSessionInfo(), - TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null); - parent.attrSubTopicType = topicType; - parent.registerSubQoS(topic, grantedQoSList, reqQoS); - } - public void onDeviceDisconnect(MqttPublishMessage mqttMsg, String deviceName) throws AdaptorException { try { processOnDisconnect(mqttMsg, deviceName); @@ -253,4 +243,9 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { return Optional.empty(); } + @Override + protected SparkplugDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg) { + return new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); + } + }