From d73ab5f406343f06df6694c1dc0fab2012b2854b Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Tue, 27 Dec 2022 16:43:53 +0200 Subject: [PATCH] sparkplug: add Sparkplug as extends abstract --- .../AbstractGatewaySessionHandler.java | 4 +- .../session/SparkplugNodeSessionHandler.java | 272 +----------------- .../mqtt/session/SparkplugSessionCtx.java | 119 +------- .../session/GatewaySessionHandlerTest.java | 2 +- 4 files changed, 21 insertions(+), 376 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index fc7b205be5..28058be428 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -114,7 +114,7 @@ public abstract class AbstractGatewaySessionHandler { if (isJsonPayloadType()) { onDeviceDisconnectJson(mqttMsg); } else { - onDeviceDisconnectProto(mqttMsg); + onGatewayNodeDisconnectProto(mqttMsg); } } @@ -308,7 +308,7 @@ public abstract class AbstractGatewaySessionHandler { processOnDisconnect(msg, deviceName); } - private void onDeviceDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { + protected void onGatewayNodeDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { try { TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload())); String deviceName = checkDeviceName(connectProto.getDeviceName()); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index f63b899515..1e15d7468a 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -15,139 +15,34 @@ */ package org.thingsboard.server.transport.mqtt.session; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import com.google.gson.JsonElement; -import com.google.gson.JsonNull; -import com.google.protobuf.InvalidProtocolBufferException; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; -import org.springframework.util.ConcurrentReferenceHashMap; -import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.transport.TransportService; -import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; -import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; -import org.thingsboard.server.gen.transport.TransportApiProtos; -import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; -import org.thingsboard.server.transport.mqtt.MqttTransportContext; -import org.thingsboard.server.transport.mqtt.MqttTransportHandler; -import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; -import javax.annotation.Nullable; import java.util.List; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED; -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG; -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopic; /** * Created by nickAS21 on 12.12.22 */ @Slf4j -public class SparkplugNodeSessionHandler { +public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler{ - private static final String DEFAULT_DEVICE_TYPE = "default"; - private static final String CAN_T_PARSE_VALUE = "Can't parse value: "; - private static final String DEVICE_PROPERTY = "device"; - private final MqttTransportContext context; - private final TransportService transportService; - private final TransportDeviceInfo nodeSparkplugInfo; - private final UUID sessionId; - private final ConcurrentMap deviceCreationLockMap; - private final ConcurrentMap devices = new ConcurrentHashMap<>(); - private final ConcurrentMap> deviceFutures = new ConcurrentHashMap<>(); - private final ConcurrentMap mqttQoSMap; - private final ChannelHandlerContext channel; - private final DeviceSessionCtx deviceSessionCtx; private String nodeTopic; public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, String nodeTopic) { - this.context = deviceSessionCtx.getContext(); - this.transportService = context.getTransportService(); - this.deviceSessionCtx = deviceSessionCtx; - this.nodeSparkplugInfo = deviceSessionCtx.getDeviceInfo(); - this.sessionId = sessionId; - this.deviceCreationLockMap = createWeakMap(); - this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap(); - this.channel = deviceSessionCtx.getChannel(); + super(deviceSessionCtx, sessionId); this.nodeTopic = nodeTopic; } - ConcurrentReferenceHashMap createWeakMap() { - return new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); - } - - public String getNodeId() { - return context.getNodeId(); - } - - public UUID getSessionId() { - return sessionId; - } - - public String getNodeTopic() { - return nodeTopic; - } - - public int nextMsgId() { - return deviceSessionCtx.nextMsgId(); - } - - public void deregisterSession(String deviceName) { - SparkplugSessionCtx deviceSessionCtx = devices.remove(deviceName); - if (deviceSessionCtx != null) { - deregisterSession(deviceName, deviceSessionCtx); - } else { - log.debug("[{}] Device [{}] was already removed from the gateway session", sessionId, deviceName); - } - } - - private void deregisterSession(String deviceName, SparkplugSessionCtx deviceSessionCtx) { - transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); - transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null); - log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName); - } - - public void onDeviceDeleted(String deviceName) { - deregisterSession(deviceName); - } - - private int getMsgId(MqttPublishMessage mqttMsg) { - return mqttMsg.variableHeader().packetId(); - } - - public void onDeviceConnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { - try { - String deviceName = parseTopic(mqttMsg.variableHeader().topicName()).getDeviceId(); - String deviceType = StringUtils.isEmpty(nodeSparkplugInfo.getDeviceType()) ? DEFAULT_DEVICE_TYPE : nodeSparkplugInfo.getDeviceType(); - processOnConnect(mqttMsg, deviceName, deviceType); - } catch (Exception e) { - throw new AdaptorException(e); - } - } - public void onPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) throws Exception { SparkplugTopic sparkplugTopic = parseTopic(topicName); - log.error("SparkplugPublishMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType()); + log.warn("SparkplugPublishMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType()); if (sparkplugTopic.isNode()) { // A node topic switch (sparkplugTopic.getType()) { @@ -164,7 +59,7 @@ public class SparkplugNodeSessionHandler { // TODO break; case NDEATH: - onNodeDisconnectProto(mqttMsg); + onGatewayNodeDisconnectProto(mqttMsg); break; case NRECORD: // TODO @@ -187,7 +82,7 @@ public class SparkplugNodeSessionHandler { // TODO break; case DDEATH: - onDeviceDisconnectProto(mqttMsg); + onGatewayNodeDisconnectProto(mqttMsg); break; case DRECORD: // TODO @@ -197,36 +92,16 @@ public class SparkplugNodeSessionHandler { } } - private void onNodeDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { - try { - TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload())); - String deviceName = checkDeviceName(connectProto.getDeviceName()); - processOnDisconnect(mqttMsg, deviceName); - } catch (RuntimeException | InvalidProtocolBufferException e) { - throw new AdaptorException(e); - } - } - - private void onDeviceDisconnectProto(MqttPublishMessage mqttMsg) throws AdaptorException { - try { - TransportApiProtos.DisconnectMsg connectProto = TransportApiProtos.DisconnectMsg.parseFrom(getBytes(mqttMsg.payload())); - String deviceName = checkDeviceName(connectProto.getDeviceName()); - // TODO disconnect device without disconnect Node - } catch (RuntimeException | InvalidProtocolBufferException e) { - throw new AdaptorException(e); - } - } - - private void processOnDisconnect(MqttPublishMessage msg, String deviceName) { - deregisterSession(deviceName); - ack(msg); - } - public void handleSparkplugSubscribeMsg(List grantedQoSList, SparkplugTopic sparkplugTopic, MqttQoS reqQoS) { String topicName = sparkplugTopic.toString(); - log.error("SparkplugSubscribeMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType()); + log.warn("SparkplugSubscribeMsg [{}] [{}]", sparkplugTopic.isNode() ? "node" : "device: " + sparkplugTopic.getDeviceId(), sparkplugTopic.getType()); - if (sparkplugTopic.isNode()) { + if (sparkplugTopic.getGroupId() == null) { + // TODO SUBSCRIBE NameSpace + } else if (sparkplugTopic.getType() == null) { + // TODO SUBSCRIBE GroupId + } + else if (sparkplugTopic.isNode()) { // A node topic switch (sparkplugTopic.getType()) { case STATE: @@ -275,127 +150,4 @@ public class SparkplugNodeSessionHandler { } } - - private byte[] getBytes(ByteBuf payload) { - return ProtoMqttAdaptor.toBytes(payload); - } - - private void ack(MqttPublishMessage msg) { - int msgId = getMsgId(msg); - if (msgId > 0) { - writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msgId)); - } - } - - ChannelFuture writeAndFlush(MqttMessage mqttMessage) { - return channel.writeAndFlush(mqttMessage); - } - - private String checkDeviceName(String deviceName) { - if (StringUtils.isEmpty(deviceName)) { - throw new RuntimeException("Device name is empty!"); - } else { - return deviceName; - } - } - - private String getDeviceName(JsonElement json) { - return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString(); - } - - - private String getDeviceType(JsonElement json) { - JsonElement type = json.getAsJsonObject().get("type"); - return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString(); - } - - - private void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) { - log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName); - Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable SparkplugSessionCtx result) { - ack(msg); - log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, t); - - } - }, context.getExecutor()); - } - - - private ListenableFuture onDeviceConnect(String deviceName, String deviceType) { - SparkplugSessionCtx result = devices.get(deviceName); - if (result == null) { - Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock()); - deviceCreationLock.lock(); - try { - result = devices.get(deviceName); - if (result == null) { - return getDeviceCreationFuture(deviceName, deviceType); - } else { - return Futures.immediateFuture(result); - } - } finally { - deviceCreationLock.unlock(); - } - } else { - return Futures.immediateFuture(result); - } - } - - private ListenableFuture getDeviceCreationFuture(String deviceName, String deviceType) { - final SettableFuture futureToSet = SettableFuture.create(); - ListenableFuture future = deviceFutures.putIfAbsent(deviceName, futureToSet); - if (future != null) { - return future; - } - try { - transportService.process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg.newBuilder() - .setDeviceName(deviceName) - .setDeviceType(deviceType) - .setGatewayIdMSB(nodeSparkplugInfo.getDeviceId().getId().getMostSignificantBits()) - .setGatewayIdLSB(nodeSparkplugInfo.getDeviceId().getId().getLeastSignificantBits()) - .setSparkplug(true) - .build(), - new TransportServiceCallback<>() { - @Override - public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { - if (msg.getDeviceInfo() == null) { - System.out.println("DeviceInfo == null"); - } - SparkplugSessionCtx nodeSparkplugSessionCtx = new SparkplugSessionCtx(SparkplugNodeSessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); - if (devices.putIfAbsent(deviceName, nodeSparkplugSessionCtx) == null) { - log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); - SessionInfoProto deviceSessionInfo = nodeSparkplugSessionCtx.getSessionInfo(); - transportService.registerAsyncSession(deviceSessionInfo, nodeSparkplugSessionCtx); - transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder() - .setSessionInfo(deviceSessionInfo) - .setSessionEvent(SESSION_EVENT_MSG_OPEN) - .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG) - .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG) - .build(), null); - } - futureToSet.set(devices.get(deviceName)); - deviceFutures.remove(deviceName); - } - - @Override - public void onError(Throwable e) { - log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e); - futureToSet.setException(e); - deviceFutures.remove(deviceName); - } - }); - return futureToSet; - } catch (Throwable e) { - deviceFutures.remove(deviceName); - throw e; - } - } - } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java index 0879a635c1..2b02571b4e 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java @@ -15,132 +15,25 @@ */ package org.thingsboard.server.transport.mqtt.session; -import io.netty.handler.codec.mqtt.MqttMessage; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.DeviceProfile; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; -import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; -import java.util.UUID; import java.util.concurrent.ConcurrentMap; /** * Created by nickAS21 on 08.12.22 */ @Slf4j -public class SparkplugSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { +public class SparkplugSessionCtx extends AbstractGatewayDeviceSessionContext { - private final SparkplugNodeSessionHandler parent; - private final TransportService transportService; - - public SparkplugSessionCtx(SparkplugNodeSessionHandler parent, TransportDeviceInfo deviceInfo, - DeviceProfile deviceProfile, ConcurrentMap mqttQoSMap, + public SparkplugSessionCtx(AbstractGatewaySessionHandler parent, + TransportDeviceInfo deviceInfo, + DeviceProfile deviceProfile, + ConcurrentMap mqttQoSMap, TransportService transportService) { - super(UUID.randomUUID(), mqttQoSMap); - this.parent = parent; - setSessionInfo(SessionInfoProto.newBuilder() - .setNodeId(parent.getNodeId()) - .setSessionIdMSB(sessionId.getMostSignificantBits()) - .setSessionIdLSB(sessionId.getLeastSignificantBits()) - .setDeviceIdMSB(deviceInfo.getDeviceId().getId().getMostSignificantBits()) - .setDeviceIdLSB(deviceInfo.getDeviceId().getId().getLeastSignificantBits()) - .setTenantIdMSB(deviceInfo.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(deviceInfo.getTenantId().getId().getLeastSignificantBits()) - .setCustomerIdMSB(deviceInfo.getCustomerId().getId().getMostSignificantBits()) - .setCustomerIdLSB(deviceInfo.getCustomerId().getId().getLeastSignificantBits()) - .setDeviceName(deviceInfo.getDeviceName()) - .setDeviceType(deviceInfo.getDeviceType()) - .setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits()) - .setGwSessionIdLSB(parent.getSessionId().getLeastSignificantBits()) - .setDeviceProfileIdMSB(deviceInfo.getDeviceProfileId().getId().getMostSignificantBits()) - .setDeviceProfileIdLSB(deviceInfo.getDeviceProfileId().getId().getLeastSignificantBits()) - .build()); - setDeviceInfo(deviceInfo); - setConnected(true); - setDeviceProfile(deviceProfile); - this.transportService = transportService; - } - - @Override - public UUID getSessionId() { - return sessionId; - } - - @Override - public int nextMsgId() { - return parent.nextMsgId(); - } - - @Override - public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) { -// try { -// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::writeAndFlush); -// } catch (Exception e) { -// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); -// } - } - - @Override - public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) { -// log.trace("[{}] Received attributes update notification to device", sessionId); -// try { -// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush); -// } catch (Exception e) { -// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); -// } - } - - @Override - public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) { -// log.trace("[{}] Received RPC command to device", sessionId); -// try { -// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent( -// payload -> { -// ChannelFuture channelFuture = parent.writeAndFlush(payload); -// if (request.getPersisted()) { -// channelFuture.addListener(result -> { -// if (result.cause() == null) { -// if (!isAckExpected(payload)) { -// transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); -// } else if (request.getPersisted()) { -// transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY); -// -// } -// } -// }); -// } -// } -// ); -// } catch (Exception e) { -// transportService.process(getSessionInfo(), -// TransportProtos.ToDeviceRpcResponseMsg.newBuilder() -// .setRequestId(request.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY); -// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); -// } - } - - @Override - public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { - log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); - parent.deregisterSession(getDeviceInfo().getDeviceName()); - } - - @Override - public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) { - // This feature is not supported in the TB IoT Gateway yet. - } - - @Override - public void onDeviceDeleted(DeviceId deviceId) { - parent.onDeviceDeleted(this.getSessionInfo().getDeviceName()); - } - - private boolean isAckExpected(MqttMessage message) { - return message.fixedHeader().qosLevel().value() > 0; + super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService); } } diff --git a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java index de3bbba8e0..4b42ccf564 100644 --- a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java +++ b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandlerTest.java @@ -46,7 +46,7 @@ public class GatewaySessionHandlerTest { @Test public void givenConcurrentReferenceHashMap_WhenGC_thenMapIsEmpty() { - AbstractGatewaySessionHandler gsh = mock(AbstractGatewaySessionHandler.class); + GatewaySessionHandler gsh = mock(GatewaySessionHandler.class); willCallRealMethod().given(gsh).createWeakMap(); ConcurrentMap map = gsh.createWeakMap();