diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java index 751bde6303..c072311fe2 100644 --- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java +++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 9ff19b7111..ff0debcd4b 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -7,10 +7,13 @@ import org.thingsboard.server.gen.transport.TransportProtos; */ public interface TransportService { - void process(TransportProtos.SessionEventMsg msg); + void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, + TransportServiceCallback callback); - void process(TransportProtos.PostTelemetryMsg msg); + void process(TransportProtos.SessionEventMsg msg, TransportServiceCallback callback); - void process(TransportProtos.PostAttributeMsg msg); + void process(TransportProtos.PostTelemetryMsg msg, TransportServiceCallback callback); + + void process(TransportProtos.PostAttributeMsg msg, TransportServiceCallback callback); } diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java new file mode 100644 index 0000000000..bfc7ac890e --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java @@ -0,0 +1,11 @@ +package org.thingsboard.server.common.transport; + +/** + * Created by ashvayka on 04.10.18. + */ +public interface TransportServiceCallback { + + void onSuccess(T msg); + void onError(Exception e); + +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java index 2e7b412c31..28269454c7 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.transport.session; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.security.DeviceCredentialsFilter; @@ -23,56 +24,27 @@ import org.thingsboard.server.common.msg.session.SessionContext; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.auth.DeviceAuthResult; import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.gen.transport.TransportProtos; import java.util.Optional; /** * @author Andrew Shvayka */ -@Slf4j +@Data public abstract class DeviceAwareSessionContext implements SessionContext { - protected final DeviceAuthService authService; - protected final SessionMsgProcessor processor; + private volatile TransportProtos.DeviceInfoProto deviceInfo; - protected volatile Device device; - - public DeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService) { - this.processor = processor; - this.authService = authService; + public long getDeviceIdMSB() { + return deviceInfo.getDeviceIdMSB(); } - public DeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device) { - this(processor, authService); - this.device = device; + public long getDeviceIdLSB() { + return deviceInfo.getDeviceIdLSB(); } - - public boolean login(DeviceCredentialsFilter credentials) { - DeviceAuthResult result = authService.process(credentials); - if (result.isSuccess()) { - Optional deviceOpt = authService.findDeviceById(result.getDeviceId()); - if (deviceOpt.isPresent()) { - device = deviceOpt.get(); - } - return true; - } else { - log.debug("Can't find device using credentials [{}] due to {}", credentials, result.getErrorMsg()); - return false; - } + public boolean isConnected() { + return deviceInfo != null; } - - public DeviceAuthService getAuthService() { - return authService; - } - - public SessionMsgProcessor getProcessor() { - return processor; - } - - public Device getDevice() { - return device; - } - - } diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto index d77e79bbd0..65d1468605 100644 --- a/common/transport/src/main/proto/transport.proto +++ b/common/transport/src/main/proto/transport.proto @@ -46,6 +46,14 @@ message TsKvListProto { repeated KeyValueProto kv = 2; } +message DeviceInfoProto { + int64 deviceIdMSB = 1; + int64 deviceIdLSB = 2; + string deviceName = 3; + string deviceType = 4; + string additionalInfo = 5; +} + /** * Messages that use Data Structures; */ @@ -53,6 +61,7 @@ message SessionEventMsg { SessionInfoProto sessionInfo = 1; int64 deviceIdMSB = 2; int64 deviceIdLSB = 3; + SessionEvent event = 4; } message PostTelemetryMsg { @@ -76,4 +85,13 @@ message GetAttributeResponseMsg { repeated TsKvListProto clientAttributeList = 2; repeated TsKvListProto sharedAttributeList = 3; repeated string deletedAttributeKeys = 4; -} \ No newline at end of file +} + +message ValidateDeviceTokenRequestMsg { + string token = 1; +} + +message ValidateDeviceTokenResponseMsg { + DeviceInfoProto deviceInfo = 1; +} + diff --git a/transport/mqtt-transport/pom.xml b/transport/mqtt-transport/pom.xml new file mode 100644 index 0000000000..8d8b24d6c6 --- /dev/null +++ b/transport/mqtt-transport/pom.xml @@ -0,0 +1,88 @@ + + + 4.0.0 + + org.thingsboard + 2.2.0-SNAPSHOT + transport + + org.thingsboard.transport + mqtt-transport + jar + + Thingsboard MQTT Transport Service + https://thingsboard.io + + + UTF-8 + ${basedir}/../.. + + + + + org.thingsboard.common + transport + + + io.netty + netty-all + + + org.springframework + spring-context + + + org.slf4j + slf4j-api + + + org.slf4j + log4j-over-slf4j + + + ch.qos.logback + logback-core + + + ch.qos.logback + logback-classic + + + com.google.guava + guava + + + org.springframework.boot + spring-boot-starter-test + test + + + junit + junit + test + + + org.mockito + mockito-all + test + + + + diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java new file mode 100644 index 0000000000..3c54938a37 --- /dev/null +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java @@ -0,0 +1,48 @@ +package org.thingsboard.server.mqtt; /** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; +import springfox.documentation.swagger2.annotations.EnableSwagger2; + +import java.util.Arrays; + +@SpringBootConfiguration +@EnableAsync +@EnableScheduling +@ComponentScan({"org.thingsboard.server"}) +public class ThingsboardMqttTransportApplication { + + private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name"; + private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-mqtt-transport"; + + public static void main(String[] args) { + SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args)); + } + + private static String[] updateArguments(String[] args) { + if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) { + String[] modifiedArgs = new String[args.length + 1]; + System.arraycopy(args, 0, modifiedArgs, 0, args.length); + modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM; + return modifiedArgs; + } + return args; + } +} diff --git a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml new file mode 100644 index 0000000000..c12f25edcf --- /dev/null +++ b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml @@ -0,0 +1,57 @@ +# +# Copyright © 2016-2018 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spring.main.web-application-type: none + +# MQTT server parameters +mqtt: + # Enable/disable mqtt transport protocol. + enabled: "${MQTT_ENABLED:true}" + bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}" + bind_port: "${MQTT_BIND_PORT:1883}" + adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}" + timeout: "${MQTT_TIMEOUT:10000}" + netty: + leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}" + boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}" + worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}" + max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}" + # MQTT SSL configuration + ssl: + # Enable/disable SSL support + enabled: "${MQTT_SSL_ENABLED:false}" + # SSL protocol: See http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#SSLContext + protocol: "${MQTT_SSL_PROTOCOL:TLSv1.2}" + # Path to the key store that holds the SSL certificate + key_store: "${MQTT_SSL_KEY_STORE:mqttserver.jks}" + # Password used to access the key store + key_store_password: "${MQTT_SSL_KEY_STORE_PASSWORD:server_ks_password}" + # Password used to access the key + key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}" + # Type of the key store + key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" + +kafka: + enabled: true + bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" + acks: "${TB_KAFKA_ACKS:all}" + retries: "${TB_KAFKA_RETRIES:1}" + batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" + linger.ms: "${TB_KAFKA_LINGER_MS:1}" + buffer.memory: "${TB_BUFFER_MEMORY:33554432}" + topic: + telemetry: "${TB_TELEMETRY_TOPIC:tb.transport.telemetry}" + requests: "${TB_TELEMETRY_TOPIC:tb.transport.requests}" \ No newline at end of file diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index 43e2a3440e..cc40ee0975 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -1,22 +1,33 @@ package org.thingsboard.server.transport.mqtt; +import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.handler.ssl.SslHandler; import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomStringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; +import javax.annotation.PostConstruct; +import java.net.InetAddress; +import java.net.UnknownHostException; + /** * Created by ashvayka on 04.10.18. */ +@Slf4j @Component @Data public class MqttTransportContext { + private final ObjectMapper mapper = new ObjectMapper(); + @Autowired @Lazy private TransportService transportService; @@ -33,6 +44,21 @@ public class MqttTransportContext { @Value("${mqtt.netty.max_payload_size}") private Integer maxPayloadSize; + @Value("${cluster.node_id:#{null}}") + private String nodeId; + private SslHandler sslHandler; + @PostConstruct + public void init() { + if (StringUtils.isEmpty(nodeId)) { + try { + nodeId = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + nodeId = RandomStringUtils.randomAlphabetic(10); + } + } + log.info("Current NodeId: {}", nodeId); + } + } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index ed0c8c642c..fd02d3aa39 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,9 +18,21 @@ package org.thingsboard.server.transport.mqtt; import com.fasterxml.jackson.databind.JsonNode; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.codec.mqtt.*; +import io.netty.handler.codec.mqtt.MqttConnAckMessage; +import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; +import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttPubAckMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubAckMessage; +import io.netty.handler.codec.mqtt.MqttSubAckPayload; +import io.netty.handler.codec.mqtt.MqttSubscribeMessage; +import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; @@ -34,13 +46,12 @@ import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg; import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; -import org.thingsboard.server.common.transport.SessionMsgProcessor; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.dao.EncryptionUtil; -import org.thingsboard.server.dao.device.DeviceService; -import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; @@ -48,18 +59,51 @@ import org.thingsboard.server.transport.mqtt.util.SslUtil; import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.cert.X509Certificate; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; -import static io.netty.handler.codec.mqtt.MqttMessageType.*; -import static io.netty.handler.codec.mqtt.MqttQoS.*; -import static org.thingsboard.server.common.msg.session.SessionMsgType.*; -import static org.thingsboard.server.transport.mqtt.MqttTopics.*; +import org.thingsboard.server.gen.transport.TransportProtos.*; + +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; +import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK; +import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP; +import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK; +import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK; +import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK; +import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; +import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; +import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE; +import static org.thingsboard.server.common.msg.session.SessionMsgType.GET_ATTRIBUTES_REQUEST; +import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_ATTRIBUTES_REQUEST; +import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_TELEMETRY_REQUEST; +import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST; +import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST; +import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_DEVICE_RPC_RESPONSE; +import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_SERVER_RPC_REQUEST; +import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST; +import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST; +import static org.thingsboard.server.transport.mqtt.MqttTopics.BASE_GATEWAY_API_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX; +import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_TELEMETRY_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_CONNECT_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_DISCONNECT_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_RPC_TOPIC; +import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_TELEMETRY_TOPIC; /** * @author Andrew Shvayka @@ -69,33 +113,34 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE; - private final DeviceSessionCtx deviceSessionCtx; - private final String sessionId; + private final UUID sessionId; + private final MqttTransportContext context; private final MqttTransportAdaptor adaptor; - private final SessionMsgProcessor processor; - private final DeviceService deviceService; - private final DeviceAuthService authService; - private final RelationService relationService; + private final TransportService transportService; private final QuotaService quotaService; private final SslHandler sslHandler; private final ConcurrentMap mqttQoSMap; - private volatile boolean connected; + private final SessionInfoProto sessionInfo; + private volatile InetSocketAddress address; + private volatile DeviceSessionCtx deviceSessionCtx; private volatile GatewaySessionCtx gatewaySessionCtx; - public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, - MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) { - this.processor = processor; - this.deviceService = deviceService; - this.relationService = relationService; - this.authService = authService; - this.adaptor = adaptor; + public MqttTransportHandler(MqttTransportContext context) { + this.sessionId = UUID.randomUUID(); + this.context = context; + this.transportService = context.getTransportService(); + this.adaptor = context.getAdaptor(); + this.quotaService = context.getQuotaService(); + this.sslHandler = context.getSslHandler(); this.mqttQoSMap = new ConcurrentHashMap<>(); - this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor, mqttQoSMap); - this.sessionId = deviceSessionCtx.getSessionId().toUidStr(); - this.sslHandler = sslHandler; - this.quotaService = quotaService; + this.sessionInfo = SessionInfoProto.newBuilder() + .setNodeId(context.getNodeId()) + .setSessionIdMSB(sessionId.getMostSignificantBits()) + .setSessionIdLSB(sessionId.getLeastSignificantBits()) + .build(); + this.deviceSessionCtx = new DeviceSessionCtx(mqttQoSMap); } @Override @@ -127,196 +172,196 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case CONNECT: processConnect(ctx, (MqttConnectMessage) msg); break; - case PUBLISH: - processPublish(ctx, (MqttPublishMessage) msg); - break; - case SUBSCRIBE: - processSubscribe(ctx, (MqttSubscribeMessage) msg); - break; - case UNSUBSCRIBE: - processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); - break; - case PINGREQ: - if (checkConnected(ctx)) { - ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); - } - break; - case DISCONNECT: - if (checkConnected(ctx)) { - processDisconnect(ctx); - } - break; +// case PUBLISH: +// processPublish(ctx, (MqttPublishMessage) msg); +// break; +// case SUBSCRIBE: +// processSubscribe(ctx, (MqttSubscribeMessage) msg); +// break; +// case UNSUBSCRIBE: +// processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); +// break; +// case PINGREQ: +// if (checkConnected(ctx)) { +// ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); +// } +// break; +// case DISCONNECT: +// if (checkConnected(ctx)) { +// processDisconnect(ctx); +// } +// break; default: break; } } - private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) { - if (!checkConnected(ctx)) { - return; - } - String topicName = mqttMsg.variableHeader().topicName(); - int msgId = mqttMsg.variableHeader().messageId(); - log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId); - - if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) { - if (gatewaySessionCtx != null) { - gatewaySessionCtx.setChannel(ctx); - handleMqttPublishMsg(topicName, msgId, mqttMsg); - } - } else { - processDevicePublish(ctx, mqttMsg, topicName, msgId); - } - } - - private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) { - try { - switch (topicName) { - case GATEWAY_TELEMETRY_TOPIC: - gatewaySessionCtx.onDeviceTelemetry(mqttMsg); - break; - case GATEWAY_ATTRIBUTES_TOPIC: - gatewaySessionCtx.onDeviceAttributes(mqttMsg); - break; - case GATEWAY_ATTRIBUTES_REQUEST_TOPIC: - gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); - break; - case GATEWAY_RPC_TOPIC: - gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); - break; - case GATEWAY_CONNECT_TOPIC: - gatewaySessionCtx.onDeviceConnect(mqttMsg); - break; - case GATEWAY_DISCONNECT_TOPIC: - gatewaySessionCtx.onDeviceDisconnect(mqttMsg); - break; - } - } catch (RuntimeException | AdaptorException e) { - log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); - } - } - - private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { - AdaptorToSessionActorMsg msg = null; - try { - if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) { - msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg); - } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { - msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg); - } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { - msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg); - if (msgId >= 0) { - ctx.writeAndFlush(createMqttPubAckMsg(msgId)); - } - } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) { - msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg); - if (msgId >= 0) { - ctx.writeAndFlush(createMqttPubAckMsg(msgId)); - } - } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) { - msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg); - if (msgId >= 0) { - ctx.writeAndFlush(createMqttPubAckMsg(msgId)); - } - } - } catch (AdaptorException e) { - log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); - } - if (msg != null) { - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); - } else { - log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId); - ctx.close(); - } - } - - private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { - if (!checkConnected(ctx)) { - return; - } - log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); - List grantedQoSList = new ArrayList<>(); - for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { - String topic = subscription.topicName(); - MqttQoS reqQoS = subscription.qualityOfService(); - try { - switch (topic) { - case DEVICE_ATTRIBUTES_TOPIC: { - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); - registerSubQoS(topic, grantedQoSList, reqQoS); - break; - } - case DEVICE_RPC_REQUESTS_SUB_TOPIC: { - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); - registerSubQoS(topic, grantedQoSList, reqQoS); - break; - } - case DEVICE_RPC_RESPONSE_SUB_TOPIC: - case GATEWAY_ATTRIBUTES_TOPIC: - case GATEWAY_RPC_TOPIC: - registerSubQoS(topic, grantedQoSList, reqQoS); - break; - case DEVICE_ATTRIBUTES_RESPONSES_TOPIC: - deviceSessionCtx.setAllowAttributeResponses(); - registerSubQoS(topic, grantedQoSList, reqQoS); - break; - default: - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); - grantedQoSList.add(FAILURE.value()); - break; - } - } catch (AdaptorException e) { - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); - grantedQoSList.add(FAILURE.value()); - } - } - ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList)); - } - - private void registerSubQoS(String topic, List grantedQoSList, MqttQoS reqQoS) { - grantedQoSList.add(getMinSupportedQos(reqQoS)); - mqttQoSMap.put(topic, getMinSupportedQos(reqQoS)); - } - - private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { - if (!checkConnected(ctx)) { - return; - } - log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); - for (String topicName : mqttMsg.payload().topics()) { - mqttQoSMap.remove(topicName); - try { - switch (topicName) { - case DEVICE_ATTRIBUTES_TOPIC: { - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); - break; - } - case DEVICE_RPC_REQUESTS_SUB_TOPIC: { - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); - break; - } - case DEVICE_ATTRIBUTES_RESPONSES_TOPIC: - deviceSessionCtx.setDisallowAttributeResponses(); - break; - } - } catch (AdaptorException e) { - log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName); - } - } - ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId())); - } - - private MqttMessage createUnSubAckMessage(int msgId) { - MqttFixedHeader mqttFixedHeader = - new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0); - MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId); - return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader); - } +// private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) { +// if (!checkConnected(ctx)) { +// return; +// } +// String topicName = mqttMsg.variableHeader().topicName(); +// int msgId = mqttMsg.variableHeader().packetId(); +// log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId); +// +// if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) { +// if (gatewaySessionCtx != null) { +// gatewaySessionCtx.setChannel(ctx); +// handleMqttPublishMsg(topicName, msgId, mqttMsg); +// } +// } else { +// processDevicePublish(ctx, mqttMsg, topicName, msgId); +// } +// } +// +// private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) { +// try { +// switch (topicName) { +// case GATEWAY_TELEMETRY_TOPIC: +// gatewaySessionCtx.onDeviceTelemetry(mqttMsg); +// break; +// case GATEWAY_ATTRIBUTES_TOPIC: +// gatewaySessionCtx.onDeviceAttributes(mqttMsg); +// break; +// case GATEWAY_ATTRIBUTES_REQUEST_TOPIC: +// gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); +// break; +// case GATEWAY_RPC_TOPIC: +// gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); +// break; +// case GATEWAY_CONNECT_TOPIC: +// gatewaySessionCtx.onDeviceConnect(mqttMsg); +// break; +// case GATEWAY_DISCONNECT_TOPIC: +// gatewaySessionCtx.onDeviceDisconnect(mqttMsg); +// break; +// } +// } catch (RuntimeException | AdaptorException e) { +// log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); +// } +// } +// +// private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { +// AdaptorToSessionActorMsg msg = null; +// try { +// if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) { +// msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg); +// } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { +// msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg); +// } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { +// msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg); +// if (msgId >= 0) { +// ctx.writeAndFlush(createMqttPubAckMsg(msgId)); +// } +// } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) { +// msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg); +// if (msgId >= 0) { +// ctx.writeAndFlush(createMqttPubAckMsg(msgId)); +// } +// } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) { +// msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg); +// if (msgId >= 0) { +// ctx.writeAndFlush(createMqttPubAckMsg(msgId)); +// } +// } +// } catch (AdaptorException e) { +// log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); +// } +// if (msg != null) { +// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); +// } else { +// log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId); +// ctx.close(); +// } +// } +// +// private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { +// if (!checkConnected(ctx)) { +// return; +// } +// log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); +// List grantedQoSList = new ArrayList<>(); +// for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { +// String topic = subscription.topicName(); +// MqttQoS reqQoS = subscription.qualityOfService(); +// try { +// switch (topic) { +// case DEVICE_ATTRIBUTES_TOPIC: { +// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); +// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); +// registerSubQoS(topic, grantedQoSList, reqQoS); +// break; +// } +// case DEVICE_RPC_REQUESTS_SUB_TOPIC: { +// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); +// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); +// registerSubQoS(topic, grantedQoSList, reqQoS); +// break; +// } +// case DEVICE_RPC_RESPONSE_SUB_TOPIC: +// case GATEWAY_ATTRIBUTES_TOPIC: +// case GATEWAY_RPC_TOPIC: +// registerSubQoS(topic, grantedQoSList, reqQoS); +// break; +// case DEVICE_ATTRIBUTES_RESPONSES_TOPIC: +// deviceSessionCtx.setAllowAttributeResponses(); +// registerSubQoS(topic, grantedQoSList, reqQoS); +// break; +// default: +// log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); +// grantedQoSList.add(FAILURE.value()); +// break; +// } +// } catch (AdaptorException e) { +// log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); +// grantedQoSList.add(FAILURE.value()); +// } +// } +// ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList)); +// } +// +// private void registerSubQoS(String topic, List grantedQoSList, MqttQoS reqQoS) { +// grantedQoSList.add(getMinSupportedQos(reqQoS)); +// mqttQoSMap.put(topic, getMinSupportedQos(reqQoS)); +// } +// +// private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { +// if (!checkConnected(ctx)) { +// return; +// } +// log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); +// for (String topicName : mqttMsg.payload().topics()) { +// mqttQoSMap.remove(topicName); +// try { +// switch (topicName) { +// case DEVICE_ATTRIBUTES_TOPIC: { +// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); +// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); +// break; +// } +// case DEVICE_RPC_REQUESTS_SUB_TOPIC: { +// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); +// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); +// break; +// } +// case DEVICE_ATTRIBUTES_RESPONSES_TOPIC: +// deviceSessionCtx.setDisallowAttributeResponses(); +// break; +// } +// } catch (AdaptorException e) { +// log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName); +// } +// } +// ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId())); +// } +// +// private MqttMessage createUnSubAckMessage(int msgId) { +// MqttFixedHeader mqttFixedHeader = +// new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0); +// MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId); +// return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader); +// } private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) { log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier()); @@ -333,36 +378,58 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (StringUtils.isEmpty(userName)) { ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)); ctx.close(); - } else if (!deviceSessionCtx.login(new DeviceTokenCredentials(msg.payload().userName()))) { - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); - ctx.close(); } else { - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); - connected = true; - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), - new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg()))); - checkGatewaySession(); + transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(), + new TransportServiceCallback() { + @Override + public void onSuccess(ValidateDeviceTokenResponseMsg msg) { + if (!msg.hasDeviceInfo()) { + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); + ctx.close(); + } else { + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); + deviceSessionCtx.setDeviceInfo(deviceSessionCtx.getDeviceInfo()); + transportService.process(getSessionEventMsg(SessionEvent.OPEN), null); + checkGatewaySession(); + } + } + + @Override + public void onError(Exception e) { + log.trace("[{}] Failed to process credentials: {}", address, userName, e); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE)); + ctx.close(); + } + }); } } + protected SessionEventMsg getSessionEventMsg(SessionEvent event) { + return SessionEventMsg.newBuilder() + .setSessionInfo(sessionInfo) + .setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB()) + .setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB()) + .setEvent(event).build(); + } + private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) { - try { - String strCert = SslUtil.getX509CertificateString(cert); - String sha3Hash = EncryptionUtil.getSha3Hash(strCert); - if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) { - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); - connected = true; - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), - new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg()))); - checkGatewaySession(); - } else { - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); - ctx.close(); - } - } catch (Exception e) { - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); - ctx.close(); - } +// try { +// String strCert = SslUtil.getX509CertificateString(cert); +// String sha3Hash = EncryptionUtil.getSha3Hash(strCert); +// if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) { +// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); +// connected = true; +// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), +// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg()))); +// checkGatewaySession(); +// } else { +// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); +// ctx.close(); +// } +// } catch (Exception e) { +// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); +// ctx.close(); +// } } private X509Certificate getX509Certificate() { @@ -380,8 +447,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void processDisconnect(ChannelHandlerContext ctx) { ctx.close(); - if (connected) { - processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId())); + if (deviceSessionCtx.isConnected()) { + transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null); if (gatewaySessionCtx != null) { gatewaySessionCtx.onGatewayDisconnect(); } @@ -428,7 +495,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private boolean checkConnected(ChannelHandlerContext ctx) { - if (connected) { + if (deviceSessionCtx.isConnected()) { return true; } else { log.info("[{}] Closing current session due to invalid msg order [{}][{}]", sessionId); @@ -438,18 +505,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private void checkGatewaySession() { - Device device = deviceSessionCtx.getDevice(); - JsonNode infoNode = device.getAdditionalInfo(); - if (infoNode != null) { - JsonNode gatewayNode = infoNode.get("gateway"); - if (gatewayNode != null && gatewayNode.asBoolean()) { - gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx); + DeviceInfoProto device = deviceSessionCtx.getDeviceInfo(); + try { + JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo()); + if (infoNode != null) { + JsonNode gatewayNode = infoNode.get("gateway"); + if (gatewayNode != null && gatewayNode.asBoolean()) { + gatewaySessionCtx = new GatewaySessionCtx(deviceSessionCtx); + } } + } catch (IOException e) { + log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e); } } @Override public void operationComplete(Future future) throws Exception { - processor.process(SessionCloseMsg.onError(deviceSessionCtx.getSessionId())); + transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null); } } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index 3dbb3efe0e..3f29a82fe1 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -40,15 +40,13 @@ import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { - private final MqttTransportAdaptor adaptor; private final MqttSessionId sessionId; private ChannelHandlerContext channel; private volatile boolean allowAttributeResponses; private AtomicInteger msgIdSeq = new AtomicInteger(0); - public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap mqttQoSMap) { - super(processor, authService, mqttQoSMap); - this.adaptor = adaptor; + public DeviceSessionCtx(ConcurrentMap mqttQoSMap) { + super(null, null, null); this.sessionId = new MqttSessionId(); } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index 98ad6d2c2c..dec86028b7 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -78,6 +78,10 @@ public class GatewaySessionCtx { this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap(); } + public GatewaySessionCtx(DeviceSessionCtx deviceSessionCtx) { + + } + public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException { JsonElement json = getJson(msg); String deviceName = checkDeviceName(getDeviceName(json)); diff --git a/transport/pom.xml b/transport/pom.xml index 95b0112a2a..a01c7ac7ba 100644 --- a/transport/pom.xml +++ b/transport/pom.xml @@ -38,6 +38,7 @@ http coap mqtt + mqtt-transport