Added ProvisionTransportType to get protocol

This commit is contained in:
zbeacon 2020-10-06 18:27:58 +03:00
parent 616f6a8b4f
commit 2ec09e08ff
2 changed files with 11 additions and 1 deletions

View File

@ -42,6 +42,7 @@ import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.msg.EncryptionUtil;
import org.thingsboard.server.common.transport.SessionMsgListener;
@ -159,6 +160,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (e.getCause().toString().contains("JsonSyntaxException")) {
TransportProtos.ProvisionDeviceRequestMsg provisionRequestMsg = deviceSessionCtx.getContext().getProtoMqttAdaptor().convertToProvisionRequestMsg(deviceSessionCtx, mqttMsg);
transportService.process(provisionRequestMsg, new DeviceProvisionCallback(ctx, msgId, provisionRequestMsg));
deviceSessionCtx.setProvisionPayloadType(TransportPayloadType.PROTOBUF);
log.trace("[{}][{}] Processing provision publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
} else {
throw e;
@ -326,7 +328,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
}
try {
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
if (deviceSessionCtx.getProvisionPayloadType().equals(TransportPayloadType.JSON)) {
deviceSessionCtx.getContext().getJsonMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} else {
deviceSessionCtx.getContext().getProtoMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
}
} catch (Exception e) {
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
}

View File

@ -55,6 +55,10 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
private volatile MqttTopicFilter attributesTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
private volatile TransportPayloadType payloadType = TransportPayloadType.JSON;
@Getter
@Setter
private TransportPayloadType provisionPayloadType = payloadType;
public DeviceSessionCtx(UUID sessionId, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap, MqttTransportContext context) {
super(sessionId, mqttQoSMap);
this.context = context;