diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java index f0fb51e6e8..2e7b412c31 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java @@ -73,4 +73,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext { public Device getDevice() { return device; } + + } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 091b45d3ec..5effd2b497 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -53,6 +53,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; import static io.netty.handler.codec.mqtt.MqttMessageType.*; @@ -77,12 +78,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private final RelationService relationService; private final QuotaService quotaService; private final SslHandler sslHandler; + private final ConcurrentMap mqttQoSMap; + private volatile boolean connected; private volatile InetSocketAddress address; private volatile GatewaySessionCtx gatewaySessionCtx; - private Map mqttQoSMap = new ConcurrentHashMap<>(); - public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) { this.processor = processor; @@ -90,7 +91,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement this.relationService = relationService; this.authService = authService; this.adaptor = adaptor; - this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor); + this.mqttQoSMap = new ConcurrentHashMap<>(); + this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor, mqttQoSMap); this.sessionId = deviceSessionCtx.getSessionId().toUidStr(); this.sslHandler = sslHandler; this.quotaService = quotaService; @@ -170,18 +172,25 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) { try { - if (topicName.equals(GATEWAY_TELEMETRY_TOPIC)) { - gatewaySessionCtx.onDeviceTelemetry(mqttMsg); - } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { - gatewaySessionCtx.onDeviceAttributes(mqttMsg); - } else if (topicName.equals(GATEWAY_ATTRIBUTES_REQUEST_TOPIC)) { - gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); - } else if (topicName.equals(GATEWAY_RPC_TOPIC)) { - gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); - } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) { - gatewaySessionCtx.onDeviceConnect(mqttMsg); - } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) { - gatewaySessionCtx.onDeviceDisconnect(mqttMsg); + switch (topicName) { + case GATEWAY_TELEMETRY_TOPIC: + gatewaySessionCtx.onDeviceTelemetry(mqttMsg); + break; + case GATEWAY_ATTRIBUTES_TOPIC: + gatewaySessionCtx.onDeviceAttributes(mqttMsg); + break; + case GATEWAY_ATTRIBUTES_REQUEST_TOPIC: + gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); + break; + case GATEWAY_RPC_TOPIC: + gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); + break; + case GATEWAY_CONNECT_TOPIC: + gatewaySessionCtx.onDeviceConnect(mqttMsg); + break; + case GATEWAY_DISCONNECT_TOPIC: + gatewaySessionCtx.onDeviceDisconnect(mqttMsg); + break; } } catch (RuntimeException | AdaptorException e) { log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); @@ -229,40 +238,53 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); List grantedQoSList = new ArrayList<>(); for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { - String topicName = subscription.topicName(); - //TODO: handle this qos level. + String topic = subscription.topicName(); MqttQoS reqQoS = subscription.qualityOfService(); - mqttQoSMap.put(topicName, reqQoS); try { - if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); - grantedQoSList.add(getMinSupportedQos(reqQoS)); - } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) { - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); - grantedQoSList.add(getMinSupportedQos(reqQoS)); - } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) { - grantedQoSList.add(getMinSupportedQos(reqQoS)); - } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) { - deviceSessionCtx.setAllowAttributeResponses(); - grantedQoSList.add(getMinSupportedQos(reqQoS)); - } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { - grantedQoSList.add(getMinSupportedQos(reqQoS)); - }else if (topicName.equals(GATEWAY_RPC_TOPIC)) { - grantedQoSList.add(getMinSupportedQos(reqQoS)); - } else { - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS); - grantedQoSList.add(FAILURE.value()); + switch (topic) { + case DEVICE_ATTRIBUTES_TOPIC: { + AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); + registerSubQoS(topic, grantedQoSList, reqQoS); + break; + } + case DEVICE_RPC_REQUESTS_SUB_TOPIC: { + AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); + registerSubQoS(topic, grantedQoSList, reqQoS); + break; + } + case DEVICE_RPC_RESPONSE_SUB_TOPIC: + registerSubQoS(topic, grantedQoSList, reqQoS); + break; + case DEVICE_ATTRIBUTES_RESPONSES_TOPIC: + deviceSessionCtx.setAllowAttributeResponses(); + registerSubQoS(topic, grantedQoSList, reqQoS); + break; + case GATEWAY_ATTRIBUTES_TOPIC: + registerSubQoS(topic, grantedQoSList, reqQoS); + break; + case GATEWAY_RPC_TOPIC: + registerSubQoS(topic, grantedQoSList, reqQoS); + break; + default: + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); + grantedQoSList.add(FAILURE.value()); + break; } } catch (AdaptorException e) { - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS); + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); grantedQoSList.add(FAILURE.value()); } } ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList)); } + private void registerSubQoS(String topic, List grantedQoSList, MqttQoS reqQoS) { + grantedQoSList.add(getMinSupportedQos(reqQoS)); + mqttQoSMap.put(topic, getMinSupportedQos(reqQoS)); + } + private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { if (!checkConnected(ctx)) { return; @@ -271,14 +293,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement for (String topicName : mqttMsg.payload().topics()) { mqttQoSMap.remove(topicName); try { - if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); - } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) { - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); - } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) { - deviceSessionCtx.setDisallowAttributeResponses(); + switch (topicName) { + case DEVICE_ATTRIBUTES_TOPIC: { + AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); + break; + } + case DEVICE_RPC_REQUESTS_SUB_TOPIC: { + AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); + break; + } + case DEVICE_ATTRIBUTES_RESPONSES_TOPIC: + deviceSessionCtx.setDisallowAttributeResponses(); + break; } } catch (AdaptorException e) { log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName); diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index f0b29cbd17..c8baaf7e53 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -170,7 +170,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) { MqttFixedHeader mqttFixedHeader = - new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0); + new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0); MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId()); ByteBuf payload = ALLOCATOR.buffer(); payload.writeBytes(GSON.toJson(json).getBytes(UTF8)); diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index 9367a04407..3dbb3efe0e 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -30,13 +30,15 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /** * @author Andrew Shvayka */ @Slf4j -public class DeviceSessionCtx extends DeviceAwareSessionContext { +public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { private final MqttTransportAdaptor adaptor; private final MqttSessionId sessionId; @@ -44,8 +46,8 @@ public class DeviceSessionCtx extends DeviceAwareSessionContext { private volatile boolean allowAttributeResponses; private AtomicInteger msgIdSeq = new AtomicInteger(0); - public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor) { - super(processor, authService); + public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap mqttQoSMap) { + super(processor, authService, mqttQoSMap); this.adaptor = adaptor; this.sessionId = new MqttSessionId(); } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index dd9c9215a6..e5be5c71a1 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -38,13 +38,15 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import java.nio.charset.Charset; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /** * Created by ashvayka on 19.01.17. */ -public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { +public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext { private static final Gson GSON = new Gson(); private static final Charset UTF8 = Charset.forName("UTF-8"); @@ -56,8 +58,8 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { private volatile boolean closed; private AtomicInteger msgIdSeq = new AtomicInteger(0); - public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) { - super(parent.getProcessor(), parent.getAuthService(), device); + public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap mqttQoSMap) { + super(parent.getProcessor(), parent.getAuthService(), device, mqttQoSMap); this.parent = parent; this.sessionId = new MqttSessionId(); } @@ -195,7 +197,7 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { private MqttPublishMessage createMqttPublishMsg(String topic, JsonElement json) { MqttFixedHeader mqttFixedHeader = - new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0); + new MqttFixedHeader(MqttMessageType.PUBLISH, false, getQoSForTopic(topic), false, 0); MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, msgIdSeq.incrementAndGet()); ByteBuf payload = ALLOCATOR.buffer(); payload.writeBytes(GSON.toJson(json).getBytes(UTF8)); diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index 69ed17f3b0..98ad6d2c2c 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -43,6 +43,7 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import java.util.*; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload; @@ -63,6 +64,7 @@ public class GatewaySessionCtx { private final DeviceAuthService authService; private final RelationService relationService; private final Map devices; + private final ConcurrentMap mqttQoSMap; private ChannelHandlerContext channel; public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) { @@ -73,6 +75,7 @@ public class GatewaySessionCtx { this.gateway = gatewaySessionCtx.getDevice(); this.gatewaySessionId = gatewaySessionCtx.getSessionId(); this.devices = new HashMap<>(); + this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap(); } public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException { @@ -96,7 +99,7 @@ public class GatewaySessionCtx { relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created")); processor.onDeviceAdded(device); } - GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device); + GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device, mqttQoSMap); devices.put(deviceName, ctx); log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName); processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg()))); diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java new file mode 100644 index 0000000000..f085064016 --- /dev/null +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java @@ -0,0 +1,57 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.session; + +import io.netty.handler.codec.mqtt.MqttQoS; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.transport.SessionMsgProcessor; +import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; + +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +/** + * Created by ashvayka on 30.08.18. + */ +public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext { + + private final ConcurrentMap mqttQoSMap; + + public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap mqttQoSMap) { + super(processor, authService); + this.mqttQoSMap = mqttQoSMap; + } + + public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap mqttQoSMap) { + super(processor, authService, device); + this.mqttQoSMap = mqttQoSMap; + } + + public ConcurrentMap getMqttQoSMap() { + return mqttQoSMap; + } + + public MqttQoS getQoSForTopic(String topic) { + Integer qos = mqttQoSMap.get(topic); + if (qos != null) { + return MqttQoS.valueOf(qos); + } else { + return MqttQoS.AT_LEAST_ONCE; + } + } + +}