From 81c484d745a4a459b6180d889ae68f6be6629c44 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Thu, 15 Dec 2022 11:41:12 +0200 Subject: [PATCH] sparkplug: connection2 --- ...ssionCtx.java => SparkplugSessionCtx.java} | 87 +++++++++---------- 1 file changed, 42 insertions(+), 45 deletions(-) rename common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/{SparkplugNodeSessionCtx.java => SparkplugSessionCtx.java} (59%) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java similarity index 59% rename from common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionCtx.java rename to common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java index 7ba309e450..0879a635c1 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java @@ -15,15 +15,12 @@ */ package org.thingsboard.server.transport.mqtt.session; -import io.netty.channel.ChannelFuture; import io.netty.handler.codec.mqtt.MqttMessage; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; -import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; @@ -32,17 +29,17 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; /** - * Created by ashvayka on 19.01.17. + * Created by nickAS21 on 08.12.22 */ @Slf4j -public class SparkplugNodeSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { +public class SparkplugSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { - private final GatewaySessionHandler parent; + private final SparkplugNodeSessionHandler parent; private final TransportService transportService; - public SparkplugNodeSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo, - DeviceProfile deviceProfile, ConcurrentMap mqttQoSMap, - TransportService transportService) { + public SparkplugSessionCtx(SparkplugNodeSessionHandler parent, TransportDeviceInfo deviceInfo, + DeviceProfile deviceProfile, ConcurrentMap mqttQoSMap, + TransportService transportService) { super(UUID.randomUUID(), mqttQoSMap); this.parent = parent; setSessionInfo(SessionInfoProto.newBuilder() @@ -80,50 +77,50 @@ public class SparkplugNodeSessionCtx extends MqttDeviceAwareSessionContext imple @Override public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) { - try { - parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::writeAndFlush); - } catch (Exception e) { - log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); - } +// try { +// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::writeAndFlush); +// } catch (Exception e) { +// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); +// } } @Override public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) { - log.trace("[{}] Received attributes update notification to device", sessionId); - try { - parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush); - } catch (Exception e) { - log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); - } +// log.trace("[{}] Received attributes update notification to device", sessionId); +// try { +// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush); +// } catch (Exception e) { +// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); +// } } @Override public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) { - log.trace("[{}] Received RPC command to device", sessionId); - try { - parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent( - payload -> { - ChannelFuture channelFuture = parent.writeAndFlush(payload); - if (request.getPersisted()) { - channelFuture.addListener(result -> { - if (result.cause() == null) { - if (!isAckExpected(payload)) { - transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); - } else if (request.getPersisted()) { - transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY); - - } - } - }); - } - } - ); - } catch (Exception e) { - transportService.process(getSessionInfo(), - TransportProtos.ToDeviceRpcResponseMsg.newBuilder() - .setRequestId(request.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY); - log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); - } +// log.trace("[{}] Received RPC command to device", sessionId); +// try { +// parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent( +// payload -> { +// ChannelFuture channelFuture = parent.writeAndFlush(payload); +// if (request.getPersisted()) { +// channelFuture.addListener(result -> { +// if (result.cause() == null) { +// if (!isAckExpected(payload)) { +// transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); +// } else if (request.getPersisted()) { +// transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY); +// +// } +// } +// }); +// } +// } +// ); +// } catch (Exception e) { +// transportService.process(getSessionInfo(), +// TransportProtos.ToDeviceRpcResponseMsg.newBuilder() +// .setRequestId(request.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY); +// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); +// } } @Override