From bc591ce09fd718222a26ca89fc69f1304c0cada0 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Thu, 4 Oct 2018 16:36:46 +0300 Subject: [PATCH] tmp commit to merge with master --- common/transport/pom.xml | 13 +++ .../common/transport/TransportService.java | 16 ++++ .../transport/src/main/proto/transport.proto | 79 +++++++++++++++++++ .../mqtt/MqttSslHandlerProvider.java | 1 - .../transport/mqtt/MqttTransportContext.java | 38 +++++++++ .../mqtt/MqttTransportServerInitializer.java | 33 +++----- .../transport/mqtt/MqttTransportService.java | 36 ++------- 7 files changed, 161 insertions(+), 55 deletions(-) create mode 100644 common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java create mode 100644 common/transport/src/main/proto/transport.proto create mode 100644 transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java diff --git a/common/transport/pom.xml b/common/transport/pom.xml index a349012fce..b8a7ab53f0 100644 --- a/common/transport/pom.xml +++ b/common/transport/pom.xml @@ -86,6 +86,19 @@ org.apache.commons commons-lang3 + + com.google.protobuf + protobuf-java + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + + + + diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java new file mode 100644 index 0000000000..9ff19b7111 --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -0,0 +1,16 @@ +package org.thingsboard.server.common.transport; + +import org.thingsboard.server.gen.transport.TransportProtos; + +/** + * Created by ashvayka on 04.10.18. + */ +public interface TransportService { + + void process(TransportProtos.SessionEventMsg msg); + + void process(TransportProtos.PostTelemetryMsg msg); + + void process(TransportProtos.PostAttributeMsg msg); + +} diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto new file mode 100644 index 0000000000..d77e79bbd0 --- /dev/null +++ b/common/transport/src/main/proto/transport.proto @@ -0,0 +1,79 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; +package transport; + +option java_package = "org.thingsboard.server.gen.transport"; +option java_outer_classname = "TransportProtos"; + +/** + * Data Structures; + */ +message SessionInfoProto { + string nodeId = 1; + int64 sessionIdMSB = 2; + int64 sessionIdLSB = 3; +} + +enum SessionEvent { + OPEN = 0; + CLOSED = 1; +} + +message KeyValueProto { + string key = 1; + bool bool_v = 2; + int64 long_v = 3; + double double_v = 4; + string string_v = 5; +} + +message TsKvListProto { + int64 ts = 1; + repeated KeyValueProto kv = 2; +} + +/** + * Messages that use Data Structures; + */ +message SessionEventMsg { + SessionInfoProto sessionInfo = 1; + int64 deviceIdMSB = 2; + int64 deviceIdLSB = 3; +} + +message PostTelemetryMsg { + SessionInfoProto sessionInfo = 1; + repeated TsKvListProto tsKvList = 2; +} + +message PostAttributeMsg { + SessionInfoProto sessionInfo = 1; + repeated TsKvListProto tsKvList = 2; +} + +message GetAttributeRequestMsg { + SessionInfoProto sessionInfo = 1; + repeated string clientAttributeNames = 2; + repeated string sharedAttributeNames = 3; +} + +message GetAttributeResponseMsg { + SessionInfoProto sessionInfo = 1; + repeated TsKvListProto clientAttributeList = 2; + repeated TsKvListProto sharedAttributeList = 3; + repeated string deletedAttributeKeys = 4; +} \ No newline at end of file diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java index 9c458fd39f..70ed5013e4 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java @@ -60,7 +60,6 @@ public class MqttSslHandlerProvider { @Autowired private DeviceCredentialsService deviceCredentialsService; - public SslHandler getSslHandler() { try { URL ksUrl = Resources.getResource(keyStoreFile); diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java new file mode 100644 index 0000000000..43e2a3440e --- /dev/null +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -0,0 +1,38 @@ +package org.thingsboard.server.transport.mqtt; + +import io.netty.handler.ssl.SslHandler; +import lombok.Data; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService; +import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; + +/** + * Created by ashvayka on 04.10.18. + */ +@Component +@Data +public class MqttTransportContext { + + @Autowired + @Lazy + private TransportService transportService; + + @Autowired(required = false) + private MqttSslHandlerProvider sslHandlerProvider; + + @Autowired(required = false) + private HostRequestsQuotaService quotaService; + + @Autowired + private MqttTransportAdaptor adaptor; + + @Value("${mqtt.netty.max_payload_size}") + private Integer maxPayloadSize; + + private SslHandler sslHandler; + +} diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java index 432966eeff..102a0d9f07 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.ssl.SslHandler; import org.thingsboard.server.common.transport.SessionMsgProcessor; +import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.dao.device.DeviceService; @@ -33,41 +34,25 @@ import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; */ public class MqttTransportServerInitializer extends ChannelInitializer { - private final SessionMsgProcessor processor; - private final DeviceService deviceService; - private final DeviceAuthService authService; - private final RelationService relationService; - private final MqttTransportAdaptor adaptor; - private final MqttSslHandlerProvider sslHandlerProvider; - private final QuotaService quotaService; - private final int maxPayloadSize; + private final MqttTransportContext context; - public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, - MqttTransportAdaptor adaptor, MqttSslHandlerProvider sslHandlerProvider, - QuotaService quotaService, int maxPayloadSize) { - this.processor = processor; - this.deviceService = deviceService; - this.authService = authService; - this.relationService = relationService; - this.adaptor = adaptor; - this.sslHandlerProvider = sslHandlerProvider; - this.quotaService = quotaService; - this.maxPayloadSize = maxPayloadSize; + public MqttTransportServerInitializer(MqttTransportContext context) { + this.context = context; } @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); SslHandler sslHandler = null; - if (sslHandlerProvider != null) { - sslHandler = sslHandlerProvider.getSslHandler(); + if (context.getSslHandlerProvider() != null) { + sslHandler = context.getSslHandlerProvider().getSslHandler(); pipeline.addLast(sslHandler); + context.setSslHandler(sslHandler); } - pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize)); + pipeline.addLast("decoder", new MqttDecoder(context.getMaxPayloadSize())); pipeline.addLast("encoder", MqttEncoder.INSTANCE); - MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, - adaptor, sslHandler, quotaService); + MqttTransportHandler handler = new MqttTransportHandler(context); pipeline.addLast(handler); ch.closeFuture().addListener(handler); diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java index bb8d4ad54c..f435cc2dc5 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java @@ -23,11 +23,14 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.ResourceLeakDetector; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.server.common.transport.SessionMsgProcessor; +import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService; import org.thingsboard.server.dao.device.DeviceService; @@ -48,27 +51,6 @@ public class MqttTransportService { private static final String V1 = "v1"; private static final String DEVICE = "device"; - @Autowired(required = false) - private ApplicationContext appContext; - - @Autowired(required = false) - private SessionMsgProcessor processor; - - @Autowired(required = false) - private DeviceService deviceService; - - @Autowired(required = false) - private DeviceAuthService authService; - - @Autowired(required = false) - private RelationService relationService; - - @Autowired(required = false) - private MqttSslHandlerProvider sslHandlerProvider; - - @Autowired(required = false) - private HostRequestsQuotaService quotaService; - @Value("${mqtt.bind_address}") private String host; @Value("${mqtt.bind_port}") @@ -82,10 +64,9 @@ public class MqttTransportService { private Integer bossGroupThreadCount; @Value("${mqtt.netty.worker_group_thread_count}") private Integer workerGroupThreadCount; - @Value("${mqtt.netty.max_payload_size}") - private Integer maxPayloadSize; - private MqttTransportAdaptor adaptor; + @Autowired + private MqttTransportContext context; private Channel serverChannel; private EventLoopGroup bossGroup; @@ -97,17 +78,12 @@ public class MqttTransportService { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase())); log.info("Starting MQTT transport..."); - log.info("Lookup MQTT transport adaptor {}", adaptorName); - this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName); - - log.info("Starting MQTT transport server"); bossGroup = new NioEventLoopGroup(bossGroupThreadCount); workerGroup = new NioEventLoopGroup(workerGroupThreadCount); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) - .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService, - adaptor, sslHandlerProvider, quotaService, maxPayloadSize)); + .childHandler(new MqttTransportServerInitializer(context)); serverChannel = b.bind(host, port).sync().channel(); log.info("Mqtt transport started!");