Initial structure

This commit is contained in:
Andrew Shvayka 2018-10-04 19:18:26 +03:00
parent a6c92bfbde
commit fd6ad413b7
13 changed files with 603 additions and 306 deletions

View File

@ -1,12 +1,12 @@
/** /**
* Copyright © 2016-2018 The Thingsboard Authors * Copyright © 2016-2018 The Thingsboard Authors
* * <p>
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -7,10 +7,13 @@ import org.thingsboard.server.gen.transport.TransportProtos;
*/ */
public interface TransportService { public interface TransportService {
void process(TransportProtos.SessionEventMsg msg); void process(TransportProtos.ValidateDeviceTokenRequestMsg msg,
TransportServiceCallback<TransportProtos.ValidateDeviceTokenResponseMsg> callback);
void process(TransportProtos.PostTelemetryMsg msg); void process(TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
void process(TransportProtos.PostAttributeMsg msg); void process(TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
void process(TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback);
} }

View File

@ -0,0 +1,11 @@
package org.thingsboard.server.common.transport;
/**
* Created by ashvayka on 04.10.18.
*/
public interface TransportServiceCallback<T> {
void onSuccess(T msg);
void onError(Exception e);
}

View File

@ -1,12 +1,12 @@
/** /**
* Copyright © 2016-2018 The Thingsboard Authors * Copyright © 2016-2018 The Thingsboard Authors
* * <p>
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.common.transport.session; package org.thingsboard.server.common.transport.session;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceCredentialsFilter; import org.thingsboard.server.common.data.security.DeviceCredentialsFilter;
@ -23,56 +24,27 @@ import org.thingsboard.server.common.msg.session.SessionContext;
import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthResult; import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.Optional; import java.util.Optional;
/** /**
* @author Andrew Shvayka * @author Andrew Shvayka
*/ */
@Slf4j @Data
public abstract class DeviceAwareSessionContext implements SessionContext { public abstract class DeviceAwareSessionContext implements SessionContext {
protected final DeviceAuthService authService; private volatile TransportProtos.DeviceInfoProto deviceInfo;
protected final SessionMsgProcessor processor;
protected volatile Device device; public long getDeviceIdMSB() {
return deviceInfo.getDeviceIdMSB();
public DeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService) {
this.processor = processor;
this.authService = authService;
} }
public DeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device) { public long getDeviceIdLSB() {
this(processor, authService); return deviceInfo.getDeviceIdLSB();
this.device = device;
} }
public boolean isConnected() {
public boolean login(DeviceCredentialsFilter credentials) { return deviceInfo != null;
DeviceAuthResult result = authService.process(credentials);
if (result.isSuccess()) {
Optional<Device> deviceOpt = authService.findDeviceById(result.getDeviceId());
if (deviceOpt.isPresent()) {
device = deviceOpt.get();
}
return true;
} else {
log.debug("Can't find device using credentials [{}] due to {}", credentials, result.getErrorMsg());
return false;
}
} }
public DeviceAuthService getAuthService() {
return authService;
}
public SessionMsgProcessor getProcessor() {
return processor;
}
public Device getDevice() {
return device;
}
} }

View File

@ -46,6 +46,14 @@ message TsKvListProto {
repeated KeyValueProto kv = 2; repeated KeyValueProto kv = 2;
} }
message DeviceInfoProto {
int64 deviceIdMSB = 1;
int64 deviceIdLSB = 2;
string deviceName = 3;
string deviceType = 4;
string additionalInfo = 5;
}
/** /**
* Messages that use Data Structures; * Messages that use Data Structures;
*/ */
@ -53,6 +61,7 @@ message SessionEventMsg {
SessionInfoProto sessionInfo = 1; SessionInfoProto sessionInfo = 1;
int64 deviceIdMSB = 2; int64 deviceIdMSB = 2;
int64 deviceIdLSB = 3; int64 deviceIdLSB = 3;
SessionEvent event = 4;
} }
message PostTelemetryMsg { message PostTelemetryMsg {
@ -76,4 +85,13 @@ message GetAttributeResponseMsg {
repeated TsKvListProto clientAttributeList = 2; repeated TsKvListProto clientAttributeList = 2;
repeated TsKvListProto sharedAttributeList = 3; repeated TsKvListProto sharedAttributeList = 3;
repeated string deletedAttributeKeys = 4; repeated string deletedAttributeKeys = 4;
} }
message ValidateDeviceTokenRequestMsg {
string token = 1;
}
message ValidateDeviceTokenResponseMsg {
DeviceInfoProto deviceInfo = 1;
}

View File

@ -0,0 +1,88 @@
<!--
Copyright © 2016-2018 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>2.2.0-SNAPSHOT</version>
<artifactId>transport</artifactId>
</parent>
<groupId>org.thingsboard.transport</groupId>
<artifactId>mqtt-transport</artifactId>
<packaging>jar</packaging>
<name>Thingsboard MQTT Transport Service</name>
<url>https://thingsboard.io</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../..</main.dir>
</properties>
<dependencies>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>transport</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,48 @@
package org.thingsboard.server.mqtt; /**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.Arrays;
@SpringBootConfiguration
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server"})
public class ThingsboardMqttTransportApplication {
private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-mqtt-transport";
public static void main(String[] args) {
SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args));
}
private static String[] updateArguments(String[] args) {
if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
String[] modifiedArgs = new String[args.length + 1];
System.arraycopy(args, 0, modifiedArgs, 0, args.length);
modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;
return modifiedArgs;
}
return args;
}
}

View File

@ -0,0 +1,57 @@
#
# Copyright © 2016-2018 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring.main.web-application-type: none
# MQTT server parameters
mqtt:
# Enable/disable mqtt transport protocol.
enabled: "${MQTT_ENABLED:true}"
bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"
bind_port: "${MQTT_BIND_PORT:1883}"
adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
timeout: "${MQTT_TIMEOUT:10000}"
netty:
leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}"
boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"
# MQTT SSL configuration
ssl:
# Enable/disable SSL support
enabled: "${MQTT_SSL_ENABLED:false}"
# SSL protocol: See http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#SSLContext
protocol: "${MQTT_SSL_PROTOCOL:TLSv1.2}"
# Path to the key store that holds the SSL certificate
key_store: "${MQTT_SSL_KEY_STORE:mqttserver.jks}"
# Password used to access the key store
key_store_password: "${MQTT_SSL_KEY_STORE_PASSWORD:server_ks_password}"
# Password used to access the key
key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"
# Type of the key store
key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
kafka:
enabled: true
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
acks: "${TB_KAFKA_ACKS:all}"
retries: "${TB_KAFKA_RETRIES:1}"
batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
linger.ms: "${TB_KAFKA_LINGER_MS:1}"
buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
topic:
telemetry: "${TB_TELEMETRY_TOPIC:tb.transport.telemetry}"
requests: "${TB_TELEMETRY_TOPIC:tb.transport.requests}"

View File

@ -1,22 +1,33 @@
package org.thingsboard.server.transport.mqtt; package org.thingsboard.server.transport.mqtt;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService; import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.net.UnknownHostException;
/** /**
* Created by ashvayka on 04.10.18. * Created by ashvayka on 04.10.18.
*/ */
@Slf4j
@Component @Component
@Data @Data
public class MqttTransportContext { public class MqttTransportContext {
private final ObjectMapper mapper = new ObjectMapper();
@Autowired @Autowired
@Lazy @Lazy
private TransportService transportService; private TransportService transportService;
@ -33,6 +44,21 @@ public class MqttTransportContext {
@Value("${mqtt.netty.max_payload_size}") @Value("${mqtt.netty.max_payload_size}")
private Integer maxPayloadSize; private Integer maxPayloadSize;
@Value("${cluster.node_id:#{null}}")
private String nodeId;
private SslHandler sslHandler; private SslHandler sslHandler;
@PostConstruct
public void init() {
if (StringUtils.isEmpty(nodeId)) {
try {
nodeId = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
nodeId = RandomStringUtils.randomAlphabetic(10);
}
}
log.info("Current NodeId: {}", nodeId);
}
} }

View File

@ -1,12 +1,12 @@
/** /**
* Copyright © 2016-2018 The Thingsboard Authors * Copyright © 2016-2018 The Thingsboard Authors
* * <p>
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -18,9 +18,21 @@ package org.thingsboard.server.transport.mqtt;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*; import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
@ -34,13 +46,12 @@ import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.EncryptionUtil; import org.thingsboard.server.dao.EncryptionUtil;
import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
@ -48,18 +59,51 @@ import org.thingsboard.server.transport.mqtt.util.SslUtil;
import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate; import javax.security.cert.X509Certificate;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; import org.thingsboard.server.gen.transport.TransportProtos.*;
import static io.netty.handler.codec.mqtt.MqttMessageType.*;
import static io.netty.handler.codec.mqtt.MqttQoS.*; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
import static org.thingsboard.server.common.msg.session.SessionMsgType.*; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
import static org.thingsboard.server.transport.mqtt.MqttTopics.*; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP;
import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
import static org.thingsboard.server.common.msg.session.SessionMsgType.GET_ATTRIBUTES_REQUEST;
import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_ATTRIBUTES_REQUEST;
import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_TELEMETRY_REQUEST;
import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST;
import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST;
import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_DEVICE_RPC_RESPONSE;
import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_SERVER_RPC_REQUEST;
import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST;
import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST;
import static org.thingsboard.server.transport.mqtt.MqttTopics.BASE_GATEWAY_API_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX;
import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_TELEMETRY_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_CONNECT_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_DISCONNECT_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_RPC_TOPIC;
import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_TELEMETRY_TOPIC;
/** /**
* @author Andrew Shvayka * @author Andrew Shvayka
@ -69,33 +113,34 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE; public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
private final DeviceSessionCtx deviceSessionCtx; private final UUID sessionId;
private final String sessionId; private final MqttTransportContext context;
private final MqttTransportAdaptor adaptor; private final MqttTransportAdaptor adaptor;
private final SessionMsgProcessor processor; private final TransportService transportService;
private final DeviceService deviceService;
private final DeviceAuthService authService;
private final RelationService relationService;
private final QuotaService quotaService; private final QuotaService quotaService;
private final SslHandler sslHandler; private final SslHandler sslHandler;
private final ConcurrentMap<String, Integer> mqttQoSMap; private final ConcurrentMap<String, Integer> mqttQoSMap;
private volatile boolean connected; private final SessionInfoProto sessionInfo;
private volatile InetSocketAddress address; private volatile InetSocketAddress address;
private volatile DeviceSessionCtx deviceSessionCtx;
private volatile GatewaySessionCtx gatewaySessionCtx; private volatile GatewaySessionCtx gatewaySessionCtx;
public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, public MqttTransportHandler(MqttTransportContext context) {
MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) { this.sessionId = UUID.randomUUID();
this.processor = processor; this.context = context;
this.deviceService = deviceService; this.transportService = context.getTransportService();
this.relationService = relationService; this.adaptor = context.getAdaptor();
this.authService = authService; this.quotaService = context.getQuotaService();
this.adaptor = adaptor; this.sslHandler = context.getSslHandler();
this.mqttQoSMap = new ConcurrentHashMap<>(); this.mqttQoSMap = new ConcurrentHashMap<>();
this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor, mqttQoSMap); this.sessionInfo = SessionInfoProto.newBuilder()
this.sessionId = deviceSessionCtx.getSessionId().toUidStr(); .setNodeId(context.getNodeId())
this.sslHandler = sslHandler; .setSessionIdMSB(sessionId.getMostSignificantBits())
this.quotaService = quotaService; .setSessionIdLSB(sessionId.getLeastSignificantBits())
.build();
this.deviceSessionCtx = new DeviceSessionCtx(mqttQoSMap);
} }
@Override @Override
@ -127,196 +172,196 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case CONNECT: case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg); processConnect(ctx, (MqttConnectMessage) msg);
break; break;
case PUBLISH: // case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg); // processPublish(ctx, (MqttPublishMessage) msg);
break; // break;
case SUBSCRIBE: // case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg); // processSubscribe(ctx, (MqttSubscribeMessage) msg);
break; // break;
case UNSUBSCRIBE: // case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); // processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break; // break;
case PINGREQ: // case PINGREQ:
if (checkConnected(ctx)) { // if (checkConnected(ctx)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); // ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
} // }
break; // break;
case DISCONNECT: // case DISCONNECT:
if (checkConnected(ctx)) { // if (checkConnected(ctx)) {
processDisconnect(ctx); // processDisconnect(ctx);
} // }
break; // break;
default: default:
break; break;
} }
} }
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) { // private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
if (!checkConnected(ctx)) { // if (!checkConnected(ctx)) {
return; // return;
} // }
String topicName = mqttMsg.variableHeader().topicName(); // String topicName = mqttMsg.variableHeader().topicName();
int msgId = mqttMsg.variableHeader().messageId(); // int msgId = mqttMsg.variableHeader().packetId();
log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId); // log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
//
if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) { // if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
if (gatewaySessionCtx != null) { // if (gatewaySessionCtx != null) {
gatewaySessionCtx.setChannel(ctx); // gatewaySessionCtx.setChannel(ctx);
handleMqttPublishMsg(topicName, msgId, mqttMsg); // handleMqttPublishMsg(topicName, msgId, mqttMsg);
} // }
} else { // } else {
processDevicePublish(ctx, mqttMsg, topicName, msgId); // processDevicePublish(ctx, mqttMsg, topicName, msgId);
} // }
} // }
//
private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) { // private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {
try { // try {
switch (topicName) { // switch (topicName) {
case GATEWAY_TELEMETRY_TOPIC: // case GATEWAY_TELEMETRY_TOPIC:
gatewaySessionCtx.onDeviceTelemetry(mqttMsg); // gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
break; // break;
case GATEWAY_ATTRIBUTES_TOPIC: // case GATEWAY_ATTRIBUTES_TOPIC:
gatewaySessionCtx.onDeviceAttributes(mqttMsg); // gatewaySessionCtx.onDeviceAttributes(mqttMsg);
break; // break;
case GATEWAY_ATTRIBUTES_REQUEST_TOPIC: // case GATEWAY_ATTRIBUTES_REQUEST_TOPIC:
gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); // gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
break; // break;
case GATEWAY_RPC_TOPIC: // case GATEWAY_RPC_TOPIC:
gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); // gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
break; // break;
case GATEWAY_CONNECT_TOPIC: // case GATEWAY_CONNECT_TOPIC:
gatewaySessionCtx.onDeviceConnect(mqttMsg); // gatewaySessionCtx.onDeviceConnect(mqttMsg);
break; // break;
case GATEWAY_DISCONNECT_TOPIC: // case GATEWAY_DISCONNECT_TOPIC:
gatewaySessionCtx.onDeviceDisconnect(mqttMsg); // gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
break; // break;
} // }
} catch (RuntimeException | AdaptorException e) { // } catch (RuntimeException | AdaptorException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); // log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
} // }
} // }
//
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { // private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
AdaptorToSessionActorMsg msg = null; // AdaptorToSessionActorMsg msg = null;
try { // try {
if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) { // if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg); // msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
} else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { // } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg); // msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
} else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { // } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg); // msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
if (msgId >= 0) { // if (msgId >= 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId)); // ctx.writeAndFlush(createMqttPubAckMsg(msgId));
} // }
} else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) { // } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg); // msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
if (msgId >= 0) { // if (msgId >= 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId)); // ctx.writeAndFlush(createMqttPubAckMsg(msgId));
} // }
} else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) { // } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg); // msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
if (msgId >= 0) { // if (msgId >= 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId)); // ctx.writeAndFlush(createMqttPubAckMsg(msgId));
} // }
} // }
} catch (AdaptorException e) { // } catch (AdaptorException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); // log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
} // }
if (msg != null) { // if (msg != null) {
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); // processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
} else { // } else {
log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId); // log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
ctx.close(); // ctx.close();
} // }
} // }
//
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { // private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
if (!checkConnected(ctx)) { // if (!checkConnected(ctx)) {
return; // return;
} // }
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); // log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
List<Integer> grantedQoSList = new ArrayList<>(); // List<Integer> grantedQoSList = new ArrayList<>();
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { // for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
String topic = subscription.topicName(); // String topic = subscription.topicName();
MqttQoS reqQoS = subscription.qualityOfService(); // MqttQoS reqQoS = subscription.qualityOfService();
try { // try {
switch (topic) { // switch (topic) {
case DEVICE_ATTRIBUTES_TOPIC: { // case DEVICE_ATTRIBUTES_TOPIC: {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); // AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); // processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
registerSubQoS(topic, grantedQoSList, reqQoS); // registerSubQoS(topic, grantedQoSList, reqQoS);
break; // break;
} // }
case DEVICE_RPC_REQUESTS_SUB_TOPIC: { // case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); // AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); // processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
registerSubQoS(topic, grantedQoSList, reqQoS); // registerSubQoS(topic, grantedQoSList, reqQoS);
break; // break;
} // }
case DEVICE_RPC_RESPONSE_SUB_TOPIC: // case DEVICE_RPC_RESPONSE_SUB_TOPIC:
case GATEWAY_ATTRIBUTES_TOPIC: // case GATEWAY_ATTRIBUTES_TOPIC:
case GATEWAY_RPC_TOPIC: // case GATEWAY_RPC_TOPIC:
registerSubQoS(topic, grantedQoSList, reqQoS); // registerSubQoS(topic, grantedQoSList, reqQoS);
break; // break;
case DEVICE_ATTRIBUTES_RESPONSES_TOPIC: // case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
deviceSessionCtx.setAllowAttributeResponses(); // deviceSessionCtx.setAllowAttributeResponses();
registerSubQoS(topic, grantedQoSList, reqQoS); // registerSubQoS(topic, grantedQoSList, reqQoS);
break; // break;
default: // default:
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); // log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(FAILURE.value()); // grantedQoSList.add(FAILURE.value());
break; // break;
} // }
} catch (AdaptorException e) { // } catch (AdaptorException e) {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); // log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(FAILURE.value()); // grantedQoSList.add(FAILURE.value());
} // }
} // }
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList)); // ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
} // }
//
private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) { // private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
grantedQoSList.add(getMinSupportedQos(reqQoS)); // grantedQoSList.add(getMinSupportedQos(reqQoS));
mqttQoSMap.put(topic, getMinSupportedQos(reqQoS)); // mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
} // }
//
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { // private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
if (!checkConnected(ctx)) { // if (!checkConnected(ctx)) {
return; // return;
} // }
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); // log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
for (String topicName : mqttMsg.payload().topics()) { // for (String topicName : mqttMsg.payload().topics()) {
mqttQoSMap.remove(topicName); // mqttQoSMap.remove(topicName);
try { // try {
switch (topicName) { // switch (topicName) {
case DEVICE_ATTRIBUTES_TOPIC: { // case DEVICE_ATTRIBUTES_TOPIC: {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); // AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); // processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
break; // break;
} // }
case DEVICE_RPC_REQUESTS_SUB_TOPIC: { // case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); // AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); // processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
break; // break;
} // }
case DEVICE_ATTRIBUTES_RESPONSES_TOPIC: // case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
deviceSessionCtx.setDisallowAttributeResponses(); // deviceSessionCtx.setDisallowAttributeResponses();
break; // break;
} // }
} catch (AdaptorException e) { // } catch (AdaptorException e) {
log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName); // log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
} // }
} // }
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId())); // ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
} // }
//
private MqttMessage createUnSubAckMessage(int msgId) { // private MqttMessage createUnSubAckMessage(int msgId) {
MqttFixedHeader mqttFixedHeader = // MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0); // new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId); // MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader); // return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
} // }
private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) { private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier()); log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier());
@ -333,36 +378,58 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (StringUtils.isEmpty(userName)) { if (StringUtils.isEmpty(userName)) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)); ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
ctx.close(); ctx.close();
} else if (!deviceSessionCtx.login(new DeviceTokenCredentials(msg.payload().userName()))) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
} else { } else {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
connected = true; new TransportServiceCallback<ValidateDeviceTokenResponseMsg>() {
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), @Override
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg()))); public void onSuccess(ValidateDeviceTokenResponseMsg msg) {
checkGatewaySession(); if (!msg.hasDeviceInfo()) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
} else {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
deviceSessionCtx.setDeviceInfo(deviceSessionCtx.getDeviceInfo());
transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
checkGatewaySession();
}
}
@Override
public void onError(Exception e) {
log.trace("[{}] Failed to process credentials: {}", address, userName, e);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
ctx.close();
}
});
} }
} }
protected SessionEventMsg getSessionEventMsg(SessionEvent event) {
return SessionEventMsg.newBuilder()
.setSessionInfo(sessionInfo)
.setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB())
.setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB())
.setEvent(event).build();
}
private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) { private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) {
try { // try {
String strCert = SslUtil.getX509CertificateString(cert); // String strCert = SslUtil.getX509CertificateString(cert);
String sha3Hash = EncryptionUtil.getSha3Hash(strCert); // String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) { // if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); // ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
connected = true; // connected = true;
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), // processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg()))); // new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
checkGatewaySession(); // checkGatewaySession();
} else { // } else {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); // ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close(); // ctx.close();
} // }
} catch (Exception e) { // } catch (Exception e) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); // ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close(); // ctx.close();
} // }
} }
private X509Certificate getX509Certificate() { private X509Certificate getX509Certificate() {
@ -380,8 +447,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processDisconnect(ChannelHandlerContext ctx) { private void processDisconnect(ChannelHandlerContext ctx) {
ctx.close(); ctx.close();
if (connected) { if (deviceSessionCtx.isConnected()) {
processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId())); transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
if (gatewaySessionCtx != null) { if (gatewaySessionCtx != null) {
gatewaySessionCtx.onGatewayDisconnect(); gatewaySessionCtx.onGatewayDisconnect();
} }
@ -428,7 +495,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
private boolean checkConnected(ChannelHandlerContext ctx) { private boolean checkConnected(ChannelHandlerContext ctx) {
if (connected) { if (deviceSessionCtx.isConnected()) {
return true; return true;
} else { } else {
log.info("[{}] Closing current session due to invalid msg order [{}][{}]", sessionId); log.info("[{}] Closing current session due to invalid msg order [{}][{}]", sessionId);
@ -438,18 +505,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
private void checkGatewaySession() { private void checkGatewaySession() {
Device device = deviceSessionCtx.getDevice(); DeviceInfoProto device = deviceSessionCtx.getDeviceInfo();
JsonNode infoNode = device.getAdditionalInfo(); try {
if (infoNode != null) { JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
JsonNode gatewayNode = infoNode.get("gateway"); if (infoNode != null) {
if (gatewayNode != null && gatewayNode.asBoolean()) { JsonNode gatewayNode = infoNode.get("gateway");
gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx); if (gatewayNode != null && gatewayNode.asBoolean()) {
gatewaySessionCtx = new GatewaySessionCtx(deviceSessionCtx);
}
} }
} catch (IOException e) {
log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);
} }
} }
@Override @Override
public void operationComplete(Future<? super Void> future) throws Exception { public void operationComplete(Future<? super Void> future) throws Exception {
processor.process(SessionCloseMsg.onError(deviceSessionCtx.getSessionId())); transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
} }
} }

View File

@ -40,15 +40,13 @@ import java.util.concurrent.atomic.AtomicInteger;
@Slf4j @Slf4j
public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
private final MqttTransportAdaptor adaptor;
private final MqttSessionId sessionId; private final MqttSessionId sessionId;
private ChannelHandlerContext channel; private ChannelHandlerContext channel;
private volatile boolean allowAttributeResponses; private volatile boolean allowAttributeResponses;
private AtomicInteger msgIdSeq = new AtomicInteger(0); private AtomicInteger msgIdSeq = new AtomicInteger(0);
public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap<String, Integer> mqttQoSMap) { public DeviceSessionCtx(ConcurrentMap<String, Integer> mqttQoSMap) {
super(processor, authService, mqttQoSMap); super(null, null, null);
this.adaptor = adaptor;
this.sessionId = new MqttSessionId(); this.sessionId = new MqttSessionId();
} }

View File

@ -78,6 +78,10 @@ public class GatewaySessionCtx {
this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap(); this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap();
} }
public GatewaySessionCtx(DeviceSessionCtx deviceSessionCtx) {
}
public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException { public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
JsonElement json = getJson(msg); JsonElement json = getJson(msg);
String deviceName = checkDeviceName(getDeviceName(json)); String deviceName = checkDeviceName(getDeviceName(json));

View File

@ -38,6 +38,7 @@
<module>http</module> <module>http</module>
<module>coap</module> <module>coap</module>
<module>mqtt</module> <module>mqtt</module>
<module>mqtt-transport</module>
</modules> </modules>
<dependencies> <dependencies>