sparkplug: connection2

This commit is contained in:
nickAS21 2022-12-15 11:41:12 +02:00
parent 1bf02b75f2
commit 81c484d745

View File

@ -15,15 +15,12 @@
*/ */
package org.thingsboard.server.transport.mqtt.session; package org.thingsboard.server.transport.mqtt.session;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
@ -32,15 +29,15 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
/** /**
* Created by ashvayka on 19.01.17. * Created by nickAS21 on 08.12.22
*/ */
@Slf4j @Slf4j
public class SparkplugNodeSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { public class SparkplugSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
private final GatewaySessionHandler parent; private final SparkplugNodeSessionHandler parent;
private final TransportService transportService; private final TransportService transportService;
public SparkplugNodeSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo, public SparkplugSessionCtx(SparkplugNodeSessionHandler parent, TransportDeviceInfo deviceInfo,
DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap, DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,
TransportService transportService) { TransportService transportService) {
super(UUID.randomUUID(), mqttQoSMap); super(UUID.randomUUID(), mqttQoSMap);
@ -80,50 +77,50 @@ public class SparkplugNodeSessionCtx extends MqttDeviceAwareSessionContext imple
@Override @Override
public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) { public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
try { // try {
parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::writeAndFlush); // parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::writeAndFlush);
} catch (Exception e) { // } catch (Exception e) {
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); // log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
} // }
} }
@Override @Override
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) { public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
log.trace("[{}] Received attributes update notification to device", sessionId); // log.trace("[{}] Received attributes update notification to device", sessionId);
try { // try {
parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush); // parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush);
} catch (Exception e) { // } catch (Exception e) {
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); // log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
} // }
} }
@Override @Override
public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) { public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) {
log.trace("[{}] Received RPC command to device", sessionId); // log.trace("[{}] Received RPC command to device", sessionId);
try { // try {
parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent( // parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(
payload -> { // payload -> {
ChannelFuture channelFuture = parent.writeAndFlush(payload); // ChannelFuture channelFuture = parent.writeAndFlush(payload);
if (request.getPersisted()) { // if (request.getPersisted()) {
channelFuture.addListener(result -> { // channelFuture.addListener(result -> {
if (result.cause() == null) { // if (result.cause() == null) {
if (!isAckExpected(payload)) { // if (!isAckExpected(payload)) {
transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); // transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
} else if (request.getPersisted()) { // } else if (request.getPersisted()) {
transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY); // transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
//
} // }
} // }
}); // });
} // }
} // }
); // );
} catch (Exception e) { // } catch (Exception e) {
transportService.process(getSessionInfo(), // transportService.process(getSessionInfo(),
TransportProtos.ToDeviceRpcResponseMsg.newBuilder() // TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
.setRequestId(request.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY); // .setRequestId(request.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY);
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); // log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
} // }
} }
@Override @Override