From caf38f675594a08c66619058c4dfa9d757e7417b Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Fri, 5 Oct 2018 18:52:59 +0300 Subject: [PATCH] Some refactoring --- application/pom.xml | 2 +- .../server/kafka/TBKafkaConsumerTemplate.java | 3 +- .../server/kafka/TBKafkaProducerTemplate.java | 30 +- .../server/kafka/TbKafkaHandler.java | 12 + .../server/kafka/TbKafkaRequestTemplate.java | 15 +- .../server/kafka/TbKafkaResponseTemplate.java | 173 +++++++++++ common/transport/pom.xml | 5 + .../common/transport/TransportService.java | 15 + .../transport/TransportServiceCallback.java | 17 +- .../tenant/TenantIntervalRegistryCleaner.java | 2 + .../tenant/TenantIntervalRegistryLogger.java | 2 + .../tenant/TenantMsgsIntervalRegistry.java | 2 + .../quota/tenant/TenantQuotaService.java | 2 + .../tenant/TenantRequestLimitPolicy.java | 2 + .../session/DeviceAwareSessionContext.java | 8 +- .../transport/src/main/proto/transport.proto | 14 + .../quota/HostRequestsQuotaServiceTest.java | 6 +- pom.xml | 2 +- transport/{mqtt => mqtt-common}/pom.xml | 4 +- .../mqtt/MqttSslHandlerProvider.java | 0 .../server/transport/mqtt/MqttTopics.java | 0 .../transport/mqtt/MqttTransportContext.java | 0 .../transport/mqtt/MqttTransportHandler.java | 6 +- .../mqtt/MqttTransportServerInitializer.java | 0 .../transport/mqtt/MqttTransportService.java | 0 .../mqtt/adaptors/JsonMqttAdaptor.java | 0 .../mqtt/adaptors/MqttTransportAdaptor.java | 0 .../mqtt/session/DeviceSessionCtx.java | 12 +- .../mqtt/session/GatewayDeviceSessionCtx.java | 6 +- .../mqtt/session/GatewaySessionCtx.java | 284 ++++++++++++++++++ .../MqttDeviceAwareSessionContext.java | 4 +- .../transport/mqtt/session/MqttSessionId.java | 0 .../server/transport/mqtt/util/SslUtil.java | 0 transport/mqtt-transport/pom.xml | 28 +- .../ThingsboardMqttTransportApplication.java | 3 +- .../mqtt/service/AsyncCallbackTemplate.java | 45 +++ .../mqtt/service/MqttTransportService.java | 110 +++++++ .../service/TransportApiRequestEncoder.java | 14 + .../service/TransportApiResponseDecoder.java | 16 + .../src/main/resources/tb-mqtt-transport.yml | 34 ++- .../mqtt/session/GatewaySessionCtx.java | 280 ----------------- transport/pom.xml | 2 +- 42 files changed, 805 insertions(+), 355 deletions(-) create mode 100644 common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java create mode 100644 common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java rename transport/{mqtt => mqtt-common}/pom.xml (97%) rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java (100%) rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java (100%) rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java (100%) rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java (99%) rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java (100%) rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java (100%) rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java (100%) rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java (100%) rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java (93%) rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java (97%) create mode 100644 transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java (95%) rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionId.java (100%) rename transport/{mqtt => mqtt-common}/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java (100%) create mode 100644 transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java create mode 100644 transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java create mode 100644 transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java create mode 100644 transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java delete mode 100644 transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java diff --git a/application/pom.xml b/application/pom.xml index c4766f2ba8..bff96ec6fd 100644 --- a/application/pom.xml +++ b/application/pom.xml @@ -70,7 +70,7 @@ org.thingsboard.transport - mqtt + mqtt-common org.thingsboard diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java index 397226479e..3adb1c368d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java @@ -43,7 +43,8 @@ public class TBKafkaConsumerTemplate { private final String topic; @Builder - private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder decoder, TbKafkaRequestIdExtractor requestIdExtractor, + private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder decoder, + TbKafkaRequestIdExtractor requestIdExtractor, String clientId, String groupId, String topic, boolean autoCommit, int autoCommitIntervalMs) { Properties props = settings.toProps(); diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java index 7e24ad00a6..2611e9491a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -27,13 +27,12 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.header.Header; -import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; -import java.util.function.BiConsumer; /** * Created by ashvayka on 24.09.18. @@ -48,7 +47,7 @@ public class TBKafkaProducerTemplate { private TbKafkaEnricher enricher = ((value, responseTopic, requestId) -> value); private final TbKafkaPartitioner partitioner; - private List partitionInfoList; + private ConcurrentMap> partitionInfoMap; @Getter private final String defaultTopic; @@ -78,11 +77,16 @@ public class TBKafkaProducerTemplate { log.trace("Failed to create topic: {}", e.getMessage(), e); } //Maybe this should not be cached, but we don't plan to change size of partitions - this.partitionInfoList = producer.partitionsFor(defaultTopic); + this.partitionInfoMap = new ConcurrentHashMap<>(); + this.partitionInfoMap.putIfAbsent(defaultTopic, producer.partitionsFor(defaultTopic)); } - public T enrich(T value, String responseTopic, UUID requestId) { - return enricher.enrich(value, responseTopic, requestId); + T enrich(T value, String responseTopic, UUID requestId) { + if (enricher != null) { + return enricher.enrich(value, responseTopic, requestId); + } else { + return value; + } } public Future send(String key, T value) { @@ -101,7 +105,7 @@ public class TBKafkaProducerTemplate { byte[] data = encoder.encode(value); ProducerRecord record; Integer partition = getPartition(topic, key, value, data); - record = new ProducerRecord<>(this.defaultTopic, partition, timestamp, key, data, headers); + record = new ProducerRecord<>(topic, partition, timestamp, key, data, headers); return producer.send(record); } @@ -109,7 +113,7 @@ public class TBKafkaProducerTemplate { if (partitioner == null) { return null; } else { - return partitioner.partition(this.defaultTopic, key, value, data, partitionInfoList); + return partitioner.partition(topic, key, value, data, partitionInfoMap.computeIfAbsent(topic, producer::partitionsFor)); } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java new file mode 100644 index 0000000000..66d53c3bde --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java @@ -0,0 +1,12 @@ +package org.thingsboard.server.kafka; + +import java.util.function.Consumer; + +/** + * Created by ashvayka on 05.10.18. + */ +public interface TbKafkaHandler { + + void handle(Request request, Consumer onSuccess, Consumer onFailure); + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java index 8a0f5293d9..30b20e709d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java @@ -93,13 +93,12 @@ public class TbKafkaRequestTemplate { ConsumerRecords responses = responseTemplate.poll(Duration.ofMillis(pollInterval)); responses.forEach(response -> { Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER); - Response decocedResponse = null; + Response decodedResponse = null; UUID requestId = null; if (requestIdHeader == null) { try { - decocedResponse = responseTemplate.decode(response); - requestId = responseTemplate.extractRequestId(decocedResponse); - + decodedResponse = responseTemplate.decode(response); + requestId = responseTemplate.extractRequestId(decodedResponse); } catch (IOException e) { log.error("Failed to decode response", e); } @@ -107,17 +106,17 @@ public class TbKafkaRequestTemplate { requestId = bytesToUuid(requestIdHeader.value()); } if (requestId == null) { - log.error("[{}] Missing requestId in header and response", response); + log.error("[{}] Missing requestId in header and body", response); } else { ResponseMetaData expectedResponse = pendingRequests.remove(requestId); if (expectedResponse == null) { log.trace("[{}] Invalid or stale request", requestId); } else { try { - if (decocedResponse == null) { - decocedResponse = responseTemplate.decode(response); + if (decodedResponse == null) { + decodedResponse = responseTemplate.decode(response); } - expectedResponse.future.set(decocedResponse); + expectedResponse.future.set(decodedResponse); } catch (IOException e) { expectedResponse.future.setException(e); } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java new file mode 100644 index 0000000000..536c4b93b2 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java @@ -0,0 +1,173 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.kafka; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; + +/** + * Created by ashvayka on 25.09.18. + */ +@Slf4j +public class TbKafkaResponseTemplate { + + private final TBKafkaConsumerTemplate requestTemplate; + private final TBKafkaProducerTemplate responseTemplate; + private final TbKafkaHandler handler; + private final ConcurrentMap pendingRequests; + private final ExecutorService executor; + private final long maxPendingRequests; + + private final long pollInterval; + private volatile boolean stopped = false; + + @Builder + public TbKafkaResponseTemplate(TBKafkaConsumerTemplate requestTemplate, + TBKafkaProducerTemplate responseTemplate, + TbKafkaHandler handler, + long pollInterval, + long maxPendingRequests, + ExecutorService executor) { + this.requestTemplate = requestTemplate; + this.responseTemplate = responseTemplate; + this.handler = handler; + this.pendingRequests = new ConcurrentHashMap<>(); + this.maxPendingRequests = maxPendingRequests; + this.pollInterval = pollInterval; + this.executor = executor; + } + + public void init() { + this.responseTemplate.init(); + requestTemplate.subscribe(); + executor.submit(() -> { + long nextCleanupMs = 0L; + while (!stopped) { + ConsumerRecords requests = requestTemplate.poll(Duration.ofMillis(pollInterval)); + requests.forEach(request -> { + Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER); + if (requestIdHeader == null) { + log.error("[{}] Missing requestId in header", request); + return; + } + UUID requestId = bytesToUuid(requestIdHeader.value()); + if (requestId == null) { + log.error("[{}] Missing requestId in header and body", request); + return; + } + Header responseTopicHeader = request.headers().lastHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER); + if (responseTopicHeader == null) { + log.error("[{}] Missing response topic in header", request); + return; + } + String responseTopic = bytesToUuid(responseTopicHeader.value()); + if (requestId == null) { + log.error("[{}] Missing requestId in header and body", request); + return; + } + + Request decodedRequest = null; + String responseTopic = null; + + try { + if (decodedRequest == null) { + decodedRequest = requestTemplate.decode(request); + } + executor.submit(() -> { + handler.handle(decodedRequest, ); + }); + } catch (IOException e) { + expectedRequest.future.setException(e); + } + + }); + } + }); + } + + public void stop() { + stopped = true; + } + + public ListenableFuture post(String key, Request request) { + if (tickSize > maxPendingRequests) { + return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!")); + } + UUID requestId = UUID.randomUUID(); + List

headers = new ArrayList<>(2); + headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))); + headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()))); + SettableFuture future = SettableFuture.create(); + pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future)); + request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId); + requestTemplate.send(key, request, headers); + return future; + } + + private byte[] uuidToBytes(UUID uuid) { + ByteBuffer buf = ByteBuffer.allocate(16); + buf.putLong(uuid.getMostSignificantBits()); + buf.putLong(uuid.getLeastSignificantBits()); + return buf.array(); + } + + private static UUID bytesToUuid(byte[] bytes) { + ByteBuffer bb = ByteBuffer.wrap(bytes); + long firstLong = bb.getLong(); + long secondLong = bb.getLong(); + return new UUID(firstLong, secondLong); + } + + private byte[] stringToBytes(String string) { + return string.getBytes(StandardCharsets.UTF_8); + } + + private String bytesToString(byte[] data) { + return new String(data, StandardCharsets.UTF_8); + } + + private static class ResponseMetaData { + private final long expTime; + private final SettableFuture future; + + ResponseMetaData(long ts, SettableFuture future) { + this.expTime = ts; + this.future = future; + } + } + +} diff --git a/common/transport/pom.xml b/common/transport/pom.xml index b8a7ab53f0..46efbf4dcc 100644 --- a/common/transport/pom.xml +++ b/common/transport/pom.xml @@ -78,6 +78,11 @@ org.springframework spring-context + + org.springframework.boot + spring-boot-starter-web + provided + com.google.guava guava diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java index ff0debcd4b..e72a626ca1 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.common.transport; import org.thingsboard.server.gen.transport.TransportProtos; diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java index bfc7ac890e..5443f7ed29 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.common.transport; /** @@ -6,6 +21,6 @@ package org.thingsboard.server.common.transport; public interface TransportServiceCallback { void onSuccess(T msg); - void onError(Exception e); + void onError(Throwable e); } diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java index c48117069a..47b427840a 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java @@ -16,10 +16,12 @@ package org.thingsboard.server.common.transport.quota.tenant; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner; @Component +@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false) public class TenantIntervalRegistryCleaner extends IntervalRegistryCleaner { public TenantIntervalRegistryCleaner(TenantMsgsIntervalRegistry intervalRegistry, diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java index c56f457423..3bdcf93d07 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java @@ -17,6 +17,7 @@ package org.thingsboard.server.common.transport.quota.tenant; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger; @@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit; @Slf4j @Component +@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false) public class TenantIntervalRegistryLogger extends IntervalRegistryLogger { private final long logIntervalMin; diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java index 6e8402c701..3402e62472 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java @@ -16,10 +16,12 @@ package org.thingsboard.server.common.transport.quota.tenant; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.thingsboard.server.common.transport.quota.inmemory.KeyBasedIntervalRegistry; @Component +@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false) public class TenantMsgsIntervalRegistry extends KeyBasedIntervalRegistry { public TenantMsgsIntervalRegistry(@Value("${quota.rule.tenant.intervalMs}") long intervalDurationMs, diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java index a68860a2e0..875a666a28 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java @@ -16,10 +16,12 @@ package org.thingsboard.server.common.transport.quota.tenant; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.thingsboard.server.common.transport.quota.AbstractQuotaService; @Component +@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false) public class TenantQuotaService extends AbstractQuotaService { public TenantQuotaService(TenantMsgsIntervalRegistry requestRegistry, TenantRequestLimitPolicy requestsPolicy, diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java index cc32c81ad7..c5dfc40480 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java @@ -16,10 +16,12 @@ package org.thingsboard.server.common.transport.quota.tenant; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.thingsboard.server.common.transport.quota.RequestLimitPolicy; @Component +@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false) public class TenantRequestLimitPolicy extends RequestLimitPolicy { public TenantRequestLimitPolicy(@Value("${quota.rule.tenant.limit}") long limit) { diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java index 28269454c7..7b2c05e400 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto index 65d1468605..f0cc3c75ed 100644 --- a/common/transport/src/main/proto/transport.proto +++ b/common/transport/src/main/proto/transport.proto @@ -95,3 +95,17 @@ message ValidateDeviceTokenResponseMsg { DeviceInfoProto deviceInfo = 1; } +/** + * Main messages; + */ +message TransportToRuleEngineMsg { + +} + +message TransportApiRequestMsg { + ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1; +} + +message TransportApiResponseMsg { + ValidateDeviceTokenResponseMsg validateTokenResponseMsg = 1; +} \ No newline at end of file diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java index 20f8a55d0d..37441e9d41 100644 --- a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java @@ -17,7 +17,11 @@ package org.thingsboard.server.common.transport.quota; import org.junit.Before; import org.junit.Test; -import org.thingsboard.server.common.transport.quota.host.*; +import org.thingsboard.server.common.transport.quota.host.HostIntervalRegistryCleaner; +import org.thingsboard.server.common.transport.quota.host.HostIntervalRegistryLogger; +import org.thingsboard.server.common.transport.quota.host.HostRequestIntervalRegistry; +import org.thingsboard.server.common.transport.quota.host.HostRequestLimitPolicy; +import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; diff --git a/pom.xml b/pom.xml index d6db123ebe..62fda36056 100755 --- a/pom.xml +++ b/pom.xml @@ -366,7 +366,7 @@ org.thingsboard.transport - mqtt + mqtt-common ${project.version} diff --git a/transport/mqtt/pom.xml b/transport/mqtt-common/pom.xml similarity index 97% rename from transport/mqtt/pom.xml rename to transport/mqtt-common/pom.xml index 785e9a5306..6be5235a8a 100644 --- a/transport/mqtt/pom.xml +++ b/transport/mqtt-common/pom.xml @@ -24,10 +24,10 @@ transport org.thingsboard.transport - mqtt + mqtt-common jar - Thingsboard MQTT Transport + Thingsboard MQTT Transport Common https://thingsboard.io diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java similarity index 100% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java similarity index 100% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java similarity index 100% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java similarity index 99% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index fd02d3aa39..8c64e7eeb2 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -395,7 +395,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } @Override - public void onError(Exception e) { + public void onError(Throwable e) { log.trace("[{}] Failed to process credentials: {}", address, userName, e); ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE)); ctx.close(); @@ -521,6 +521,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void operationComplete(Future future) throws Exception { - transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null); + if (deviceSessionCtx.isConnected()) { + transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null); + } } } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java similarity index 100% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java similarity index 100% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java similarity index 100% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java similarity index 100% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java similarity index 93% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index 3f29a82fe1..6661cb12f4 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -57,12 +57,12 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { @Override public void onMsg(SessionActorToAdaptorMsg msg) throws SessionException { - try { - adaptor.convertToAdaptorMsg(this, msg).ifPresent(this::pushToNetwork); - } catch (AdaptorException e) { - //TODO: close channel with disconnect; - logAndWrap(e); - } +// try { +// adaptor.convertToAdaptorMsg(this, msg).ifPresent(this::pushToNetwork); +// } catch (AdaptorException e) { +// //TODO: close channel with disconnect; +// logAndWrap(e); +// } } private void logAndWrap(AdaptorException e) throws SessionException { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java similarity index 97% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index e5be5c71a1..793836d793 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -135,7 +135,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext { private MqttMessage createMqttPublishMsg(String topic, GetAttributesResponse response) { JsonObject result = new JsonObject(); result.addProperty("id", response.getRequestId()); - result.addProperty(DEVICE_PROPERTY, device.getName()); +// result.addProperty(DEVICE_PROPERTY, device.getName()); Optional responseData = response.getData(); if (responseData.isPresent()) { AttributesKVMsg msg = responseData.get(); @@ -183,14 +183,14 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext { private MqttMessage createMqttPublishMsg(String topic, AttributesKVMsg data) { JsonObject result = new JsonObject(); - result.addProperty(DEVICE_PROPERTY, device.getName()); +// result.addProperty(DEVICE_PROPERTY, device.getName()); result.add("data", JsonConverter.toJson(data, false)); return createMqttPublishMsg(topic, result); } private MqttMessage createMqttPublishMsg(String topic, ToDeviceRpcRequestMsg data) { JsonObject result = new JsonObject(); - result.addProperty(DEVICE_PROPERTY, device.getName()); +// result.addProperty(DEVICE_PROPERTY, device.getName()); result.add("data", JsonConverter.toJson(data, true)); return createMqttPublishMsg(topic, result); } diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java new file mode 100644 index 0000000000..fafbc4ec01 --- /dev/null +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -0,0 +1,284 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.session; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonNull; +import com.google.gson.JsonObject; +import com.google.gson.JsonSyntaxException; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.msg.core.*; +import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; +import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg; +import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; +import org.thingsboard.server.common.transport.SessionMsgProcessor; +import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.transport.mqtt.MqttTransportHandler; +import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; + +import java.util.*; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload; + +/** + * Created by ashvayka on 19.01.17. + */ +@Slf4j +public class GatewaySessionCtx { + + private static final String DEFAULT_DEVICE_TYPE = "default"; + public static final String CAN_T_PARSE_VALUE = "Can't parse value: "; + public static final String DEVICE_PROPERTY = "device"; +// private final Device gateway; +// private final SessionId gatewaySessionId; +// private final SessionMsgProcessor processor; +// private final DeviceService deviceService; +// private final DeviceAuthService authService; +// private final RelationService relationService; +// private final Map devices; +// private final ConcurrentMap mqttQoSMap; + private ChannelHandlerContext channel; + +// public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) { +// this.processor = processor; +// this.deviceService = deviceService; +// this.authService = authService; +// this.relationService = relationService; +// this.gateway = gatewaySessionCtx.getDevice(); +// this.gatewaySessionId = gatewaySessionCtx.getSessionId(); +// this.devices = new HashMap<>(); +// this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap(); +// } + + public GatewaySessionCtx(DeviceSessionCtx deviceSessionCtx) { + + } + + public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException { + JsonElement json = getJson(msg); + String deviceName = checkDeviceName(getDeviceName(json)); + String deviceType = getDeviceType(json); + onDeviceConnect(deviceName, deviceType); + ack(msg); + } + + private void onDeviceConnect(String deviceName, String deviceType) { +// if (!devices.containsKey(deviceName)) { +// Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName); +// if (device == null) { +// device = new Device(); +// device.setTenantId(gateway.getTenantId()); +// device.setName(deviceName); +// device.setType(deviceType); +// device.setCustomerId(gateway.getCustomerId()); +// device = deviceService.saveDevice(device); +// relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created")); +// processor.onDeviceAdded(device); +// } +// GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device, mqttQoSMap); +// devices.put(deviceName, ctx); +// log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName); +// processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg()))); +// processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg()))); +// } + } + + public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException { +// String deviceName = checkDeviceName(getDeviceName(getJson(msg))); +// GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName); +// if (deviceSessionCtx != null) { +// processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId())); +// deviceSessionCtx.setClosed(true); +// log.debug("[{}] Removed device [{}] from the gateway session", gatewaySessionId, deviceName); +// } else { +// log.debug("[{}] Device [{}] was already removed from the gateway session", gatewaySessionId, deviceName); +// } +// ack(msg); + } + + public void onGatewayDisconnect() { +// devices.forEach((k, v) -> { +// processor.process(SessionCloseMsg.onDisconnect(v.getSessionId())); +// }); + } + + public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException { +// JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); +// int requestId = mqttMsg.variableHeader().messageId(); +// if (json.isJsonObject()) { +// JsonObject jsonObj = json.getAsJsonObject(); +// for (Map.Entry deviceEntry : jsonObj.entrySet()) { +// String deviceName = checkDeviceConnected(deviceEntry.getKey()); +// if (!deviceEntry.getValue().isJsonArray()) { +// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); +// } +// BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId); +// JsonArray deviceData = deviceEntry.getValue().getAsJsonArray(); +// for (JsonElement element : deviceData) { +// JsonConverter.parseWithTs(request, element.getAsJsonObject()); +// } +// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); +// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), +// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); +// } +// } else { +// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); +// } + } + + public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException { +// JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); +// if (json.isJsonObject()) { +// JsonObject jsonObj = json.getAsJsonObject(); +// String deviceName = checkDeviceConnected(jsonObj.get(DEVICE_PROPERTY).getAsString()); +// Integer requestId = jsonObj.get("id").getAsInt(); +// String data = jsonObj.get("data").toString(); +// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); +// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), +// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data)))); +// ack(mqttMsg); +// } else { +// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); +// } + } + + public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException { +// JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); +// int requestId = mqttMsg.variableHeader().messageId(); +// if (json.isJsonObject()) { +// JsonObject jsonObj = json.getAsJsonObject(); +// for (Map.Entry deviceEntry : jsonObj.entrySet()) { +// String deviceName = checkDeviceConnected(deviceEntry.getKey()); +// if (!deviceEntry.getValue().isJsonObject()) { +// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); +// } +// long ts = System.currentTimeMillis(); +// BasicAttributesUpdateRequest request = new BasicAttributesUpdateRequest(requestId); +// JsonObject deviceData = deviceEntry.getValue().getAsJsonObject(); +// request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList())); +// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); +// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), +// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); +// } +// } else { +// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); +// } + } + + public void onDeviceAttributesRequest(MqttPublishMessage msg) throws AdaptorException { +// JsonElement json = validateJsonPayload(gatewaySessionId, msg.payload()); +// if (json.isJsonObject()) { +// JsonObject jsonObj = json.getAsJsonObject(); +// int requestId = jsonObj.get("id").getAsInt(); +// String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString(); +// boolean clientScope = jsonObj.get("client").getAsBoolean(); +// Set keys; +// if (jsonObj.has("key")) { +// keys = Collections.singleton(jsonObj.get("key").getAsString()); +// } else { +// JsonArray keysArray = jsonObj.get("keys").getAsJsonArray(); +// keys = new HashSet<>(); +// for (JsonElement keyObj : keysArray) { +// keys.add(keyObj.getAsString()); +// } +// } +// +// BasicGetAttributesRequest request; +// if (clientScope) { +// request = new BasicGetAttributesRequest(requestId, keys, null); +// } else { +// request = new BasicGetAttributesRequest(requestId, null, keys); +// } +// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); +// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), +// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); +// ack(msg); +// } else { +// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); +// } + } + + private String checkDeviceConnected(String deviceName) { +// if (!devices.containsKey(deviceName)) { +// log.debug("[{}] Missing device [{}] for the gateway session", gatewaySessionId, deviceName); +// onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE); +// } +// return deviceName; + return null; + } + + private String checkDeviceName(String deviceName) { + if (StringUtils.isEmpty(deviceName)) { + throw new RuntimeException("Device name is empty!"); + } else { + return deviceName; + } + } + + private String getDeviceName(JsonElement json) throws AdaptorException { + return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString(); + } + + private String getDeviceType(JsonElement json) throws AdaptorException { + JsonElement type = json.getAsJsonObject().get("type"); + return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString(); + } + + private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException { +// return JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + return null; + } + + protected SessionMsgProcessor getProcessor() { +// return processor; + return null; + } + + DeviceAuthService getAuthService() { +// return authService; + return null; + } + + public void setChannel(ChannelHandlerContext channel) { + this.channel = channel; + } + + private void ack(MqttPublishMessage msg) { + if (msg.variableHeader().messageId() > 0) { + writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId())); + } + } + + void writeAndFlush(MqttMessage mqttMessage) { + channel.writeAndFlush(mqttMessage); + } + +} diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java similarity index 95% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java index f085064016..c01e103889 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java @@ -32,12 +32,12 @@ public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionCo private final ConcurrentMap mqttQoSMap; public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap mqttQoSMap) { - super(processor, authService); + super(); this.mqttQoSMap = mqttQoSMap; } public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap mqttQoSMap) { - super(processor, authService, device); + super(); this.mqttQoSMap = mqttQoSMap; } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionId.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionId.java similarity index 100% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionId.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionId.java diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java similarity index 100% rename from transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java rename to transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java diff --git a/transport/mqtt-transport/pom.xml b/transport/mqtt-transport/pom.xml index 8d8b24d6c6..012aabd4e9 100644 --- a/transport/mqtt-transport/pom.xml +++ b/transport/mqtt-transport/pom.xml @@ -41,32 +41,12 @@ transport - io.netty - netty-all + org.thingsboard.transport + mqtt-common - org.springframework - spring-context - - - org.slf4j - slf4j-api - - - org.slf4j - log4j-over-slf4j - - - ch.qos.logback - logback-core - - - ch.qos.logback - logback-classic - - - com.google.guava - guava + org.thingsboard.common + queue org.springframework.boot diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java index 3c54938a37..4740342692 100644 --- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java @@ -19,14 +19,13 @@ import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; -import springfox.documentation.swagger2.annotations.EnableSwagger2; import java.util.Arrays; @SpringBootConfiguration @EnableAsync @EnableScheduling -@ComponentScan({"org.thingsboard.server"}) +@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.kafka"}) public class ThingsboardMqttTransportApplication { private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name"; diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java new file mode 100644 index 0000000000..493c0c8b9f --- /dev/null +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java @@ -0,0 +1,45 @@ +package org.thingsboard.server.mqtt.service; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import javax.annotation.Nullable; +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +/** + * Created by ashvayka on 05.10.18. + */ +public class AsyncCallbackTemplate { + + public static void withCallback(ListenableFuture future, Consumer onSuccess, + Consumer onFailure) { + withCallback(future, onSuccess, onFailure, null); + } + + public static void withCallback(ListenableFuture future, Consumer onSuccess, + Consumer onFailure, Executor executor) { + FutureCallback callback = new FutureCallback() { + @Override + public void onSuccess(@Nullable T result) { + try { + onSuccess.accept(result); + } catch (Throwable th) { + onFailure(th); + } + } + + @Override + public void onFailure(Throwable t) { + onFailure.accept(t); + } + }; + if (executor != null) { + Futures.addCallback(future, callback, executor); + } else { + Futures.addCallback(future, callback); + } + } + +} diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java new file mode 100644 index 0000000000..a1641f152f --- /dev/null +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java @@ -0,0 +1,110 @@ +package org.thingsboard.server.mqtt.service; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.TransportServiceCallback; +import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; +import org.thingsboard.server.kafka.TBKafkaProducerTemplate; +import org.thingsboard.server.kafka.TbKafkaRequestTemplate; +import org.thingsboard.server.gen.transport.TransportProtos.*; +import org.thingsboard.server.kafka.TbKafkaSettings; +import org.thingsboard.server.transport.mqtt.MqttTransportContext; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Created by ashvayka on 05.10.18. + */ +@Service +public class MqttTransportService implements TransportService { + + @Value("${kafka.rule-engine.topic}") + private String ruleEngineTopic; + @Value("${kafka.transport-api.requests-topic}") + private String transportApiRequestsTopic; + @Value("${kafka.transport-api.responses-topic}") + private String transportApiResponsesTopic; + @Value("${kafka.transport-api.max_pending_requests}") + private long maxPendingRequests; + @Value("${kafka.transport-api.max_requests_timeout}") + private long maxRequestsTimeout; + @Value("${kafka.transport-api.response_poll_interval}") + private int responsePollDuration; + @Value("${kafka.transport-api.response_auto_commit_interval}") + private int autoCommitInterval; + + @Autowired + private TbKafkaSettings kafkaSettings; + //We use this to get the node id. We should replace this with a component that provides the node id. + @Autowired + private MqttTransportContext transportContext; + + private ExecutorService transportCallbackExecutor; + + private TbKafkaRequestTemplate transportApiTemplate; + + @PostConstruct + public void init() { + this.transportCallbackExecutor = Executors.newCachedThreadPool(); + + TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder requestBuilder = TBKafkaProducerTemplate.builder(); + requestBuilder.settings(kafkaSettings); + requestBuilder.defaultTopic(transportApiRequestsTopic); + requestBuilder.encoder(new TransportApiRequestEncoder()); + + TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder responseBuilder = TBKafkaConsumerTemplate.builder(); + responseBuilder.settings(kafkaSettings); + responseBuilder.topic(transportApiResponsesTopic + "." + transportContext.getNodeId()); + responseBuilder.clientId(transportContext.getNodeId()); + responseBuilder.groupId("transport-node"); + responseBuilder.autoCommit(true); + responseBuilder.autoCommitIntervalMs(autoCommitInterval); + responseBuilder.decoder(new TransportApiResponseDecoder()); + + TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder + builder = TbKafkaRequestTemplate.builder(); + builder.requestTemplate(requestBuilder.build()); + builder.responseTemplate(responseBuilder.build()); + builder.maxPendingRequests(maxPendingRequests); + builder.maxRequestTimeout(maxRequestsTimeout); + builder.pollInterval(responsePollDuration); + transportApiTemplate = builder.build(); + transportApiTemplate.init(); + } + + @PreDestroy + public void destroy() { + if (transportApiTemplate != null) { + transportApiTemplate.stop(); + } + if (transportCallbackExecutor != null) { + transportCallbackExecutor.shutdownNow(); + } + } + + @Override + public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback callback) { + AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()), + response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor); + } + + @Override + public void process(SessionEventMsg msg, TransportServiceCallback callback) { + + } + + @Override + public void process(PostTelemetryMsg msg, TransportServiceCallback callback) { + + } + + @Override + public void process(PostAttributeMsg msg, TransportServiceCallback callback) { + + } +} diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java new file mode 100644 index 0000000000..f931db6a2e --- /dev/null +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java @@ -0,0 +1,14 @@ +package org.thingsboard.server.mqtt.service; + +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.kafka.TbKafkaEncoder; + +/** + * Created by ashvayka on 05.10.18. + */ +public class TransportApiRequestEncoder implements TbKafkaEncoder { + @Override + public byte[] encode(TransportApiRequestMsg value) { + return value.toByteArray(); + } +} diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java new file mode 100644 index 0000000000..22e16478ae --- /dev/null +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java @@ -0,0 +1,16 @@ +package org.thingsboard.server.mqtt.service; + +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; +import org.thingsboard.server.kafka.TbKafkaDecoder; + +import java.io.IOException; + +/** + * Created by ashvayka on 05.10.18. + */ +public class TransportApiResponseDecoder implements TbKafkaDecoder { + @Override + public TransportApiResponseMsg decode(byte[] data) throws IOException { + return TransportApiResponseMsg.parseFrom(data); + } +} diff --git a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml index c12f25edcf..a2a0d54adc 100644 --- a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml @@ -44,6 +44,27 @@ mqtt: # Type of the key store key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" +#Quota parameters +quota: + host: + # Max allowed number of API requests in interval for single host + limit: "${QUOTA_HOST_LIMIT:10000}" + # Interval duration + intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}" + # Maximum silence duration for host after which Host removed from QuotaService. Must be bigger than intervalMs + ttlMs: "${QUOTA_HOST_TTL_MS:60000}" + # Interval for scheduled task that cleans expired records. TTL is used for expiring + cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}" + # Enable Host API Limits + enabled: "${QUOTA_HOST_ENABLED:false}" + # Array of whitelist hosts + whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}" + # Array of blacklist hosts + blacklist: "${QUOTA_HOST_BLACKLIST:}" + log: + topSize: 10 + intervalMin: 2 + kafka: enabled: true bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" @@ -52,6 +73,13 @@ kafka: batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" linger.ms: "${TB_KAFKA_LINGER_MS:1}" buffer.memory: "${TB_BUFFER_MEMORY:33554432}" - topic: - telemetry: "${TB_TELEMETRY_TOPIC:tb.transport.telemetry}" - requests: "${TB_TELEMETRY_TOPIC:tb.transport.requests}" \ No newline at end of file + transport-api: + requests-topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}" + responses-topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}" + max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}" + max_requests_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}" + response_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}" + response_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}" + # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted + rule-engine: + topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}" diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java deleted file mode 100644 index dec86028b7..0000000000 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ /dev/null @@ -1,280 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.mqtt.session; - -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonNull; -import com.google.gson.JsonObject; -import com.google.gson.JsonSyntaxException; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.mqtt.MqttMessage; -import io.netty.handler.codec.mqtt.MqttPublishMessage; -import lombok.extern.slf4j.Slf4j; -import org.springframework.util.StringUtils; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.id.SessionId; -import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; -import org.thingsboard.server.common.data.relation.EntityRelation; -import org.thingsboard.server.common.msg.core.*; -import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; -import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; -import org.thingsboard.server.common.transport.SessionMsgProcessor; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; -import org.thingsboard.server.common.transport.auth.DeviceAuthService; -import org.thingsboard.server.dao.device.DeviceService; -import org.thingsboard.server.dao.relation.RelationService; -import org.thingsboard.server.transport.mqtt.MqttTransportHandler; -import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; - -import java.util.*; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; - -import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload; - -/** - * Created by ashvayka on 19.01.17. - */ -@Slf4j -public class GatewaySessionCtx { - - private static final String DEFAULT_DEVICE_TYPE = "default"; - public static final String CAN_T_PARSE_VALUE = "Can't parse value: "; - public static final String DEVICE_PROPERTY = "device"; - private final Device gateway; - private final SessionId gatewaySessionId; - private final SessionMsgProcessor processor; - private final DeviceService deviceService; - private final DeviceAuthService authService; - private final RelationService relationService; - private final Map devices; - private final ConcurrentMap mqttQoSMap; - private ChannelHandlerContext channel; - - public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) { - this.processor = processor; - this.deviceService = deviceService; - this.authService = authService; - this.relationService = relationService; - this.gateway = gatewaySessionCtx.getDevice(); - this.gatewaySessionId = gatewaySessionCtx.getSessionId(); - this.devices = new HashMap<>(); - this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap(); - } - - public GatewaySessionCtx(DeviceSessionCtx deviceSessionCtx) { - - } - - public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException { - JsonElement json = getJson(msg); - String deviceName = checkDeviceName(getDeviceName(json)); - String deviceType = getDeviceType(json); - onDeviceConnect(deviceName, deviceType); - ack(msg); - } - - private void onDeviceConnect(String deviceName, String deviceType) { - if (!devices.containsKey(deviceName)) { - Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName); - if (device == null) { - device = new Device(); - device.setTenantId(gateway.getTenantId()); - device.setName(deviceName); - device.setType(deviceType); - device.setCustomerId(gateway.getCustomerId()); - device = deviceService.saveDevice(device); - relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created")); - processor.onDeviceAdded(device); - } - GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device, mqttQoSMap); - devices.put(deviceName, ctx); - log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName); - processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg()))); - processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg()))); - } - } - - public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException { - String deviceName = checkDeviceName(getDeviceName(getJson(msg))); - GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName); - if (deviceSessionCtx != null) { - processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId())); - deviceSessionCtx.setClosed(true); - log.debug("[{}] Removed device [{}] from the gateway session", gatewaySessionId, deviceName); - } else { - log.debug("[{}] Device [{}] was already removed from the gateway session", gatewaySessionId, deviceName); - } - ack(msg); - } - - public void onGatewayDisconnect() { - devices.forEach((k, v) -> { - processor.process(SessionCloseMsg.onDisconnect(v.getSessionId())); - }); - } - - public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException { - JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); - int requestId = mqttMsg.variableHeader().messageId(); - if (json.isJsonObject()) { - JsonObject jsonObj = json.getAsJsonObject(); - for (Map.Entry deviceEntry : jsonObj.entrySet()) { - String deviceName = checkDeviceConnected(deviceEntry.getKey()); - if (!deviceEntry.getValue().isJsonArray()) { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); - } - BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId); - JsonArray deviceData = deviceEntry.getValue().getAsJsonArray(); - for (JsonElement element : deviceData) { - JsonConverter.parseWithTs(request, element.getAsJsonObject()); - } - GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), - new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); - } - } else { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); - } - } - - public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException { - JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); - if (json.isJsonObject()) { - JsonObject jsonObj = json.getAsJsonObject(); - String deviceName = checkDeviceConnected(jsonObj.get(DEVICE_PROPERTY).getAsString()); - Integer requestId = jsonObj.get("id").getAsInt(); - String data = jsonObj.get("data").toString(); - GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), - new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data)))); - ack(mqttMsg); - } else { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); - } - } - - public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException { - JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); - int requestId = mqttMsg.variableHeader().messageId(); - if (json.isJsonObject()) { - JsonObject jsonObj = json.getAsJsonObject(); - for (Map.Entry deviceEntry : jsonObj.entrySet()) { - String deviceName = checkDeviceConnected(deviceEntry.getKey()); - if (!deviceEntry.getValue().isJsonObject()) { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); - } - long ts = System.currentTimeMillis(); - BasicAttributesUpdateRequest request = new BasicAttributesUpdateRequest(requestId); - JsonObject deviceData = deviceEntry.getValue().getAsJsonObject(); - request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList())); - GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), - new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); - } - } else { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); - } - } - - public void onDeviceAttributesRequest(MqttPublishMessage msg) throws AdaptorException { - JsonElement json = validateJsonPayload(gatewaySessionId, msg.payload()); - if (json.isJsonObject()) { - JsonObject jsonObj = json.getAsJsonObject(); - int requestId = jsonObj.get("id").getAsInt(); - String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString(); - boolean clientScope = jsonObj.get("client").getAsBoolean(); - Set keys; - if (jsonObj.has("key")) { - keys = Collections.singleton(jsonObj.get("key").getAsString()); - } else { - JsonArray keysArray = jsonObj.get("keys").getAsJsonArray(); - keys = new HashSet<>(); - for (JsonElement keyObj : keysArray) { - keys.add(keyObj.getAsString()); - } - } - - BasicGetAttributesRequest request; - if (clientScope) { - request = new BasicGetAttributesRequest(requestId, keys, null); - } else { - request = new BasicGetAttributesRequest(requestId, null, keys); - } - GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), - new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); - ack(msg); - } else { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); - } - } - - private String checkDeviceConnected(String deviceName) { - if (!devices.containsKey(deviceName)) { - log.debug("[{}] Missing device [{}] for the gateway session", gatewaySessionId, deviceName); - onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE); - } - return deviceName; - } - - private String checkDeviceName(String deviceName) { - if (StringUtils.isEmpty(deviceName)) { - throw new RuntimeException("Device name is empty!"); - } else { - return deviceName; - } - } - - private String getDeviceName(JsonElement json) throws AdaptorException { - return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString(); - } - - private String getDeviceType(JsonElement json) throws AdaptorException { - JsonElement type = json.getAsJsonObject().get("type"); - return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString(); - } - - private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException { - return JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload()); - } - - protected SessionMsgProcessor getProcessor() { - return processor; - } - - DeviceAuthService getAuthService() { - return authService; - } - - public void setChannel(ChannelHandlerContext channel) { - this.channel = channel; - } - - private void ack(MqttPublishMessage msg) { - if (msg.variableHeader().messageId() > 0) { - writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId())); - } - } - - void writeAndFlush(MqttMessage mqttMessage) { - channel.writeAndFlush(mqttMessage); - } - -} diff --git a/transport/pom.xml b/transport/pom.xml index a01c7ac7ba..2401d3f845 100644 --- a/transport/pom.xml +++ b/transport/pom.xml @@ -37,7 +37,7 @@ http coap - mqtt + mqtt-common mqtt-transport