SparkPlug feature review

This commit is contained in:
Andrii Shvaika 2023-02-08 16:40:47 +02:00
parent 8b42739905
commit 07f757cbb6
7 changed files with 31 additions and 33 deletions

View File

@ -128,7 +128,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final UUID sessionId; private final UUID sessionId;
protected final MqttTransportContext context; protected final MqttTransportContext context;
public final TransportService transportService; private final TransportService transportService;
private final SchedulerComponent scheduler; private final SchedulerComponent scheduler;
private final SslHandler sslHandler; private final SslHandler sslHandler;
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap; private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
@ -142,7 +142,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final ConcurrentHashMap<String, Integer> chunkSizes; private final ConcurrentHashMap<String, Integer> chunkSizes;
private final ConcurrentMap<Integer, TransportProtos.ToDeviceRpcRequestMsg> rpcAwaitingAck; private final ConcurrentMap<Integer, TransportProtos.ToDeviceRpcRequestMsg> rpcAwaitingAck;
public TopicType attrSubTopicType; private TopicType attrSubTopicType;
private TopicType rpcSubTopicType; private TopicType rpcSubTopicType;
private TopicType attrReqTopicType; private TopicType attrReqTopicType;
private TopicType toServerRpcSubTopicType; private TopicType toServerRpcSubTopicType;

View File

@ -35,12 +35,12 @@ import java.util.concurrent.ConcurrentMap;
* Created by ashvayka on 19.01.17. * Created by ashvayka on 19.01.17.
*/ */
@Slf4j @Slf4j
public abstract class AbstractGatewayDeviceSessionContext extends MqttDeviceAwareSessionContext implements SessionMsgListener { public abstract class AbstractGatewayDeviceSessionContext<T extends AbstractGatewaySessionHandler> extends MqttDeviceAwareSessionContext implements SessionMsgListener {
protected final AbstractGatewaySessionHandler parent; protected final T parent;
private final TransportService transportService; private final TransportService transportService;
public AbstractGatewayDeviceSessionContext(AbstractGatewaySessionHandler parent, TransportDeviceInfo deviceInfo, public AbstractGatewayDeviceSessionContext(T parent, TransportDeviceInfo deviceInfo,
DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap, DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,
TransportService transportService) { TransportService transportService) {
super(UUID.randomUUID(), mqttQoSMap); super(UUID.randomUUID(), mqttQoSMap);

View File

@ -84,13 +84,13 @@ public abstract class AbstractGatewaySessionHandler {
private static final String DEVICE_PROPERTY = "device"; private static final String DEVICE_PROPERTY = "device";
protected final MqttTransportContext context; protected final MqttTransportContext context;
private final TransportService transportService; protected final TransportService transportService;
protected final TransportDeviceInfo gateway; protected final TransportDeviceInfo gateway;
protected final UUID sessionId; protected final UUID sessionId;
private final ConcurrentMap<String, Lock> deviceCreationLockMap; private final ConcurrentMap<String, Lock> deviceCreationLockMap;
private final ConcurrentMap<String, MqttDeviceAwareSessionContext> devices; private final ConcurrentMap<String, MqttDeviceAwareSessionContext> devices;
private final ConcurrentMap<String, ListenableFuture<MqttDeviceAwareSessionContext>> deviceFutures; private final ConcurrentMap<String, ListenableFuture<MqttDeviceAwareSessionContext>> deviceFutures;
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap; protected final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
protected final ChannelHandlerContext channel; protected final ChannelHandlerContext channel;
protected final DeviceSessionCtx deviceSessionCtx; protected final DeviceSessionCtx deviceSessionCtx;
@ -252,7 +252,7 @@ public abstract class AbstractGatewaySessionHandler {
new TransportServiceCallback<>() { new TransportServiceCallback<>() {
@Override @Override
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
GatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg) ; AbstractGatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg);
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
@ -282,10 +282,7 @@ public abstract class AbstractGatewaySessionHandler {
} }
} }
private GatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg) { protected abstract AbstractGatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg);
return this.deviceSessionCtx.isSparkplug() ? new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService) :
new GatewayDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
}
protected int getMsgId(MqttPublishMessage mqttMsg) { protected int getMsgId(MqttPublishMessage mqttMsg) {
return mqttMsg.variableHeader().packetId(); return mqttMsg.variableHeader().packetId();

View File

@ -24,9 +24,9 @@ import java.util.concurrent.ConcurrentMap;
/** /**
* Created by nickAS21 on 26.12.22 * Created by nickAS21 on 26.12.22
*/ */
public class GatewayDeviceSessionContext extends AbstractGatewayDeviceSessionContext{ public class GatewayDeviceSessionContext extends AbstractGatewayDeviceSessionContext<GatewaySessionHandler> {
public GatewayDeviceSessionContext(AbstractGatewaySessionHandler parent, public GatewayDeviceSessionContext(GatewaySessionHandler parent,
TransportDeviceInfo deviceInfo, TransportDeviceInfo deviceInfo,
DeviceProfile deviceProfile, DeviceProfile deviceProfile,
ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.transport.mqtt.session;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import java.util.UUID; import java.util.UUID;
@ -48,4 +49,9 @@ public class GatewaySessionHandler extends AbstractGatewaySessionHandler {
} }
} }
@Override
protected GatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg) {
return new GatewayDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
}
} }

View File

@ -27,13 +27,13 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@Slf4j @Slf4j
public class SparkplugDeviceSessionContext extends GatewayDeviceSessionContext{ public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionContext<SparkplugNodeSessionHandler> {
public SparkplugDeviceSessionContext(AbstractGatewaySessionHandler parent, public SparkplugDeviceSessionContext(SparkplugNodeSessionHandler parent,
TransportDeviceInfo deviceInfo, TransportDeviceInfo deviceInfo,
DeviceProfile deviceProfile, DeviceProfile deviceProfile,
ConcurrentMap<MqttTopicMatcher, ConcurrentMap<MqttTopicMatcher,
Integer> mqttQoSMap, Integer> mqttQoSMap,
TransportService transportService) { TransportService transportService) {
super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService); super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService);
} }
@ -43,12 +43,12 @@ public class SparkplugDeviceSessionContext extends GatewayDeviceSessionContext{
log.trace("[{}] Received attributes update notification to sparkplug device", sessionId); log.trace("[{}] Received attributes update notification to sparkplug device", sessionId);
notification.getSharedUpdatedList().forEach(tsKvProto -> { notification.getSharedUpdatedList().forEach(tsKvProto -> {
if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
SparkplugTopic sparkplugTopic = new SparkplugTopic(((SparkplugNodeSessionHandler)parent).getSparkplugTopicNode(), SparkplugTopic sparkplugTopic = new SparkplugTopic(parent.getSparkplugTopicNode(),
SparkplugMessageType.DCMD, deviceInfo.getDeviceName()); SparkplugMessageType.DCMD, deviceInfo.getDeviceName());
((SparkplugNodeSessionHandler)parent).createSparkplugMqttPublishMsg(tsKvProto, parent.createSparkplugMqttPublishMsg(tsKvProto,
sparkplugTopic.toString(), sparkplugTopic.toString(),
getDeviceBirthMetrics().get(tsKvProto.getKv().getKey())) getDeviceBirthMetrics().get(tsKvProto.getKv().getKey()))
.ifPresent(parent::writeAndFlush); .ifPresent(this.parent::writeAndFlush);
} }
}); });
} }

View File

@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.adaptor.ProtoConverter; import org.thingsboard.server.common.transport.adaptor.ProtoConverter;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
@ -149,22 +150,11 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
// SUBSCRIBE Node // SUBSCRIBE Node
parent.processAttributesSubscribe(grantedQoSList, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, reqQoS, TopicType.V1); parent.processAttributesSubscribe(grantedQoSList, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, reqQoS, TopicType.V1);
} else { } else {
// SUBSCRIBE Device // SUBSCRIBE Device - DO NOTHING, WE HAVE ALREADY SUBSCRIBED.
onSparkplugDeviceSubscribe(grantedQoSList, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, reqQoS, TopicType.V1, sparkplugTopic.getDeviceId()); // TODO: track that node subscribed to # or to particular device.
} }
} }
public void onSparkplugDeviceSubscribe(List<Integer> grantedQoSList, String topic,
MqttQoS reqQoS, TopicType topicType, String deviceName)
throws AdaptorException, ThingsboardException, ExecutionException, InterruptedException {
checkDeviceName(deviceName);
ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture = onDeviceConnectProto(deviceName);
parent.transportService.process(contextListenableFuture.get().getSessionInfo(),
TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
parent.attrSubTopicType = topicType;
parent.registerSubQoS(topic, grantedQoSList, reqQoS);
}
public void onDeviceDisconnect(MqttPublishMessage mqttMsg, String deviceName) throws AdaptorException { public void onDeviceDisconnect(MqttPublishMessage mqttMsg, String deviceName) throws AdaptorException {
try { try {
processOnDisconnect(mqttMsg, deviceName); processOnDisconnect(mqttMsg, deviceName);
@ -253,4 +243,9 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
return Optional.empty(); return Optional.empty();
} }
@Override
protected SparkplugDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg) {
return new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
}
} }