From 2ec09e08ff377d84b02e1223009bb76dc0aad594 Mon Sep 17 00:00:00 2001 From: zbeacon Date: Tue, 6 Oct 2020 18:27:58 +0300 Subject: [PATCH] Added ProvisionTransportType to get protocol --- .../server/transport/mqtt/MqttTransportHandler.java | 8 +++++++- .../server/transport/mqtt/session/DeviceSessionCtx.java | 4 ++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index fcb9b53e2b..fdbcf01ac7 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -42,6 +42,7 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.transport.SessionMsgListener; @@ -159,6 +160,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (e.getCause().toString().contains("JsonSyntaxException")) { TransportProtos.ProvisionDeviceRequestMsg provisionRequestMsg = deviceSessionCtx.getContext().getProtoMqttAdaptor().convertToProvisionRequestMsg(deviceSessionCtx, mqttMsg); transportService.process(provisionRequestMsg, new DeviceProvisionCallback(ctx, msgId, provisionRequestMsg)); + deviceSessionCtx.setProvisionPayloadType(TransportPayloadType.PROTOBUF); log.trace("[{}][{}] Processing provision publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId); } else { throw e; @@ -326,7 +328,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement ctx.writeAndFlush(createMqttPubAckMsg(msgId)); } try { - deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + if (deviceSessionCtx.getProvisionPayloadType().equals(TransportPayloadType.JSON)) { + deviceSessionCtx.getContext().getJsonMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + } else { + deviceSessionCtx.getContext().getProtoMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + } } catch (Exception e) { log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index c0c5e4cc52..20ddfb5a24 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -55,6 +55,10 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { private volatile MqttTopicFilter attributesTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); private volatile TransportPayloadType payloadType = TransportPayloadType.JSON; + @Getter + @Setter + private TransportPayloadType provisionPayloadType = payloadType; + public DeviceSessionCtx(UUID sessionId, ConcurrentMap mqttQoSMap, MqttTransportContext context) { super(sessionId, mqttQoSMap); this.context = context;