tmp commit to merge with master

This commit is contained in:
Andrew Shvayka 2018-10-04 16:36:46 +03:00
parent f7f6045800
commit bc591ce09f
7 changed files with 161 additions and 55 deletions

View File

@ -86,6 +86,19 @@
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -0,0 +1,16 @@
package org.thingsboard.server.common.transport;
import org.thingsboard.server.gen.transport.TransportProtos;
/**
* Created by ashvayka on 04.10.18.
*/
public interface TransportService {
void process(TransportProtos.SessionEventMsg msg);
void process(TransportProtos.PostTelemetryMsg msg);
void process(TransportProtos.PostAttributeMsg msg);
}

View File

@ -0,0 +1,79 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
package transport;
option java_package = "org.thingsboard.server.gen.transport";
option java_outer_classname = "TransportProtos";
/**
* Data Structures;
*/
message SessionInfoProto {
string nodeId = 1;
int64 sessionIdMSB = 2;
int64 sessionIdLSB = 3;
}
enum SessionEvent {
OPEN = 0;
CLOSED = 1;
}
message KeyValueProto {
string key = 1;
bool bool_v = 2;
int64 long_v = 3;
double double_v = 4;
string string_v = 5;
}
message TsKvListProto {
int64 ts = 1;
repeated KeyValueProto kv = 2;
}
/**
* Messages that use Data Structures;
*/
message SessionEventMsg {
SessionInfoProto sessionInfo = 1;
int64 deviceIdMSB = 2;
int64 deviceIdLSB = 3;
}
message PostTelemetryMsg {
SessionInfoProto sessionInfo = 1;
repeated TsKvListProto tsKvList = 2;
}
message PostAttributeMsg {
SessionInfoProto sessionInfo = 1;
repeated TsKvListProto tsKvList = 2;
}
message GetAttributeRequestMsg {
SessionInfoProto sessionInfo = 1;
repeated string clientAttributeNames = 2;
repeated string sharedAttributeNames = 3;
}
message GetAttributeResponseMsg {
SessionInfoProto sessionInfo = 1;
repeated TsKvListProto clientAttributeList = 2;
repeated TsKvListProto sharedAttributeList = 3;
repeated string deletedAttributeKeys = 4;
}

View File

@ -60,7 +60,6 @@ public class MqttSslHandlerProvider {
@Autowired @Autowired
private DeviceCredentialsService deviceCredentialsService; private DeviceCredentialsService deviceCredentialsService;
public SslHandler getSslHandler() { public SslHandler getSslHandler() {
try { try {
URL ksUrl = Resources.getResource(keyStoreFile); URL ksUrl = Resources.getResource(keyStoreFile);

View File

@ -0,0 +1,38 @@
package org.thingsboard.server.transport.mqtt;
import io.netty.handler.ssl.SslHandler;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
/**
* Created by ashvayka on 04.10.18.
*/
@Component
@Data
public class MqttTransportContext {
@Autowired
@Lazy
private TransportService transportService;
@Autowired(required = false)
private MqttSslHandlerProvider sslHandlerProvider;
@Autowired(required = false)
private HostRequestsQuotaService quotaService;
@Autowired
private MqttTransportAdaptor adaptor;
@Value("${mqtt.netty.max_payload_size}")
private Integer maxPayloadSize;
private SslHandler sslHandler;
}

View File

@ -22,6 +22,7 @@ import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.device.DeviceService;
@ -33,41 +34,25 @@ import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
*/ */
public class MqttTransportServerInitializer extends ChannelInitializer<SocketChannel> { public class MqttTransportServerInitializer extends ChannelInitializer<SocketChannel> {
private final SessionMsgProcessor processor; private final MqttTransportContext context;
private final DeviceService deviceService;
private final DeviceAuthService authService;
private final RelationService relationService;
private final MqttTransportAdaptor adaptor;
private final MqttSslHandlerProvider sslHandlerProvider;
private final QuotaService quotaService;
private final int maxPayloadSize;
public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, public MqttTransportServerInitializer(MqttTransportContext context) {
MqttTransportAdaptor adaptor, MqttSslHandlerProvider sslHandlerProvider, this.context = context;
QuotaService quotaService, int maxPayloadSize) {
this.processor = processor;
this.deviceService = deviceService;
this.authService = authService;
this.relationService = relationService;
this.adaptor = adaptor;
this.sslHandlerProvider = sslHandlerProvider;
this.quotaService = quotaService;
this.maxPayloadSize = maxPayloadSize;
} }
@Override @Override
public void initChannel(SocketChannel ch) { public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
SslHandler sslHandler = null; SslHandler sslHandler = null;
if (sslHandlerProvider != null) { if (context.getSslHandlerProvider() != null) {
sslHandler = sslHandlerProvider.getSslHandler(); sslHandler = context.getSslHandlerProvider().getSslHandler();
pipeline.addLast(sslHandler); pipeline.addLast(sslHandler);
context.setSslHandler(sslHandler);
} }
pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize)); pipeline.addLast("decoder", new MqttDecoder(context.getMaxPayloadSize()));
pipeline.addLast("encoder", MqttEncoder.INSTANCE); pipeline.addLast("encoder", MqttEncoder.INSTANCE);
MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, MqttTransportHandler handler = new MqttTransportHandler(context);
adaptor, sslHandler, quotaService);
pipeline.addLast(handler); pipeline.addLast(handler);
ch.closeFuture().addListener(handler); ch.closeFuture().addListener(handler);

View File

@ -23,11 +23,14 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ResourceLeakDetector; import io.netty.util.ResourceLeakDetector;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService; import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.device.DeviceService;
@ -48,27 +51,6 @@ public class MqttTransportService {
private static final String V1 = "v1"; private static final String V1 = "v1";
private static final String DEVICE = "device"; private static final String DEVICE = "device";
@Autowired(required = false)
private ApplicationContext appContext;
@Autowired(required = false)
private SessionMsgProcessor processor;
@Autowired(required = false)
private DeviceService deviceService;
@Autowired(required = false)
private DeviceAuthService authService;
@Autowired(required = false)
private RelationService relationService;
@Autowired(required = false)
private MqttSslHandlerProvider sslHandlerProvider;
@Autowired(required = false)
private HostRequestsQuotaService quotaService;
@Value("${mqtt.bind_address}") @Value("${mqtt.bind_address}")
private String host; private String host;
@Value("${mqtt.bind_port}") @Value("${mqtt.bind_port}")
@ -82,10 +64,9 @@ public class MqttTransportService {
private Integer bossGroupThreadCount; private Integer bossGroupThreadCount;
@Value("${mqtt.netty.worker_group_thread_count}") @Value("${mqtt.netty.worker_group_thread_count}")
private Integer workerGroupThreadCount; private Integer workerGroupThreadCount;
@Value("${mqtt.netty.max_payload_size}")
private Integer maxPayloadSize;
private MqttTransportAdaptor adaptor; @Autowired
private MqttTransportContext context;
private Channel serverChannel; private Channel serverChannel;
private EventLoopGroup bossGroup; private EventLoopGroup bossGroup;
@ -97,17 +78,12 @@ public class MqttTransportService {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase())); ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
log.info("Starting MQTT transport..."); log.info("Starting MQTT transport...");
log.info("Lookup MQTT transport adaptor {}", adaptorName);
this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName);
log.info("Starting MQTT transport server");
bossGroup = new NioEventLoopGroup(bossGroupThreadCount); bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
workerGroup = new NioEventLoopGroup(workerGroupThreadCount); workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
ServerBootstrap b = new ServerBootstrap(); ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService, .childHandler(new MqttTransportServerInitializer(context));
adaptor, sslHandlerProvider, quotaService, maxPayloadSize));
serverChannel = b.bind(host, port).sync().channel(); serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!"); log.info("Mqtt transport started!");