diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcController.java b/application/src/main/java/org/thingsboard/server/controller/RpcController.java index 731eeb4599..b88c3f3695 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RpcController.java +++ b/application/src/main/java/org/thingsboard/server/controller/RpcController.java @@ -88,7 +88,6 @@ public class RpcController extends BaseController { return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody); } - private DeferredResult handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException { try { JsonNode rpcRequestBody = jsonMapper.readTree(requestBody); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 0a3ae8f251..781762a352 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -136,6 +136,15 @@ public abstract class AbstractConsumerServiceorg.apache.kafka kafka-clients + + com.amazonaws + aws-java-sdk-sqs + + org.springframework spring-context-support diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueMsgDecoder.java b/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueMsgDecoder.java new file mode 100644 index 0000000000..4cb38f6685 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueMsgDecoder.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.thingsboard.server.queue.TbQueueMsg; + +public interface TbQueueMsgDecoder { + + T decode(TbQueueMsg msg) throws InvalidProtocolBufferException; +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueProducer.java b/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueProducer.java index fa9d91d149..ef308291a9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueProducer.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueProducer.java @@ -25,4 +25,5 @@ public interface TbQueueProducer { void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback); + void stop(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java index 34ad3016b0..40ca2219dd 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java @@ -92,6 +92,8 @@ public class DefaultTbQueueRequestTemplate responses = responseTemplate.poll(pollInterval); if (responses.size() > 0) { log.trace("Polling responses completed, consumer records count [{}]", responses.size()); + } else { + continue; } responses.forEach(response -> { log.trace("Received response to Kafka Template request: {}", response); @@ -145,6 +147,15 @@ public class DefaultTbQueueRequestTemplate requests = requestTemplate.poll(pollInterval); + if (requests.isEmpty()) { + continue; + } + requests.forEach(request -> { long currentTime = System.currentTimeMillis(); long requestTime = bytesToLong(request.getHeaders().get(REQUEST_TIME)); @@ -147,6 +151,12 @@ public class DefaultTbQueueResponseTemplate implements TbQueueCon } else { if (!subscribed) { List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); - topicNames.forEach(this::createTopicIfNotExists); + topicNames.forEach(admin::createTopicIfNotExists); consumer.subscribe(topicNames); subscribed = true; } @@ -130,7 +130,4 @@ public class TBKafkaConsumerTemplate implements TbQueueCon return decoder.decode(new KafkaTbQueueMsg(record)); } - private void createTopicIfNotExists(String topic) { - admin.createTopicIfNotExists(topic); - } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TBKafkaProducerTemplate.java index b749625909..2fbfa7d3df 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TBKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TBKafkaProducerTemplate.java @@ -84,4 +84,9 @@ public class TBKafkaProducerTemplate implements TbQueuePro } }); } + + @Override + public void stop() { + + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java index a2fd7450ee..cb1ff939d5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java @@ -50,4 +50,9 @@ public class InMemoryTbQueueProducer implements TbQueuePro } } } + + @Override + public void stop() { + + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueProvider.java new file mode 100644 index 0000000000..206a71d85e --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueProvider.java @@ -0,0 +1,127 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.provider; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueCoreSettings; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.TbQueueTransportApiSettings; +import org.thingsboard.server.queue.TbQueueTransportNotificationSettings; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; +import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate; +import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate; +import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='monolith'") +public class AwsSqsMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEngineQueueProvider { + + private final PartitionService partitionService; + private final TbQueueCoreSettings coreSettings; + private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRuleEngineSettings ruleEngineSettings; + private final TbQueueTransportApiSettings transportApiSettings; + private final TbQueueTransportNotificationSettings transportNotificationSettings; + private final TbAwsSqsSettings sqsSettings; + private final TbQueueAdmin admin; + + public AwsSqsMonolithQueueProvider(PartitionService partitionService, TbQueueCoreSettings coreSettings, + TbQueueRuleEngineSettings ruleEngineSettings, + TbServiceInfoProvider serviceInfoProvider, + TbQueueTransportApiSettings transportApiSettings, + TbQueueTransportNotificationSettings transportNotificationSettings, + TbAwsSqsSettings sqsSettings) { + this.partitionService = partitionService; + this.coreSettings = coreSettings; + this.serviceInfoProvider = serviceInfoProvider; + this.ruleEngineSettings = ruleEngineSettings; + this.transportApiSettings = transportApiSettings; + this.transportNotificationSettings = transportNotificationSettings; + this.sqsSettings = sqsSettings; + admin = new TbAwsSqsAdmin(sqsSettings); + } + + @Override + public TbQueueProducer> getTransportNotificationsMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportNotificationSettings.getNotificationsTopic()); + } + + @Override + public TbQueueProducer> getRuleEngineMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic()); + } + + @Override + public TbQueueProducer> getRuleEngineNotificationsMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic()); + } + + @Override + public TbQueueProducer> getTbCoreMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> getTbCoreNotificationsMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueConsumer> getToRuleEngineMsgConsumer() { + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> getToRuleEngineNotificationsMsgConsumer() { + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, + partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> getToCoreMsgConsumer() { + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, coreSettings.getTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> getToCoreNotificationsMsgConsumer() { + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, + partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> getTransportApiRequestConsumer() { + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueProducer> getTransportApiResponseProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getResponsesTopic()); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueProvider.java new file mode 100644 index 0000000000..052d3fc0e3 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueProvider.java @@ -0,0 +1,117 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.provider; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueCoreSettings; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.TbQueueTransportApiSettings; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; +import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate; +import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate; +import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-core'") +public class AwsSqsTbCoreQueueProvider implements TbCoreQueueProvider { + + private final TbAwsSqsSettings sqsSettings; + private final TbQueueRuleEngineSettings ruleEngineSettings; + private final TbQueueCoreSettings coreSettings; + private final TbQueueTransportApiSettings transportApiSettings; + private final PartitionService partitionService; + private final TbServiceInfoProvider serviceInfoProvider; + + + private final TbQueueAdmin admin; + + public AwsSqsTbCoreQueueProvider(TbAwsSqsSettings sqsSettings, + TbQueueCoreSettings coreSettings, + TbQueueTransportApiSettings transportApiSettings, + TbQueueRuleEngineSettings ruleEngineSettings, + PartitionService partitionService, + TbServiceInfoProvider serviceInfoProvider) { + this.sqsSettings = sqsSettings; + this.coreSettings = coreSettings; + this.transportApiSettings = transportApiSettings; + this.ruleEngineSettings = ruleEngineSettings; + this.partitionService = partitionService; + this.serviceInfoProvider = serviceInfoProvider; + this.admin = new TbAwsSqsAdmin(sqsSettings); + } + + @Override + public TbQueueProducer> getTransportNotificationsMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> getRuleEngineMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> getRuleEngineNotificationsMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic()); + } + + @Override + public TbQueueProducer> getTbCoreMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> getTbCoreNotificationsMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueConsumer> getToCoreMsgConsumer() { + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, coreSettings.getTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> getToCoreNotificationsMsgConsumer() { + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, + partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> getTransportApiRequestConsumer() { + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueProducer> getTransportApiResponseProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueProvider.java new file mode 100644 index 0000000000..d9f2cec9f4 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueProvider.java @@ -0,0 +1,97 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.provider; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueCoreSettings; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; +import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate; +import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate; +import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-rule-engine'") +public class AwsSqsTbRuleEngineQueueProvider implements TbRuleEngineQueueProvider { + + private final PartitionService partitionService; + private final TbQueueCoreSettings coreSettings; + private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRuleEngineSettings ruleEngineSettings; + private final TbAwsSqsSettings sqsSettings; + private final TbQueueAdmin admin; + + public AwsSqsTbRuleEngineQueueProvider(PartitionService partitionService, TbQueueCoreSettings coreSettings, + TbQueueRuleEngineSettings ruleEngineSettings, + TbServiceInfoProvider serviceInfoProvider, + TbAwsSqsSettings sqsSettings) { + this.partitionService = partitionService; + this.coreSettings = coreSettings; + this.serviceInfoProvider = serviceInfoProvider; + this.ruleEngineSettings = ruleEngineSettings; + this.sqsSettings = sqsSettings; + admin = new TbAwsSqsAdmin(sqsSettings); + } + + @Override + public TbQueueProducer> getTransportNotificationsMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> getRuleEngineMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> getRuleEngineNotificationsMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic()); + } + + @Override + public TbQueueProducer> getTbCoreMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> getTbCoreNotificationsMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueConsumer> getToRuleEngineMsgConsumer() { + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> getToRuleEngineNotificationsMsgConsumer() { + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, + partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueProvider.java new file mode 100644 index 0000000000..01360ebed8 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueProvider.java @@ -0,0 +1,93 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.provider; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.TbQueueTransportApiSettings; +import org.thingsboard.server.queue.TbQueueTransportNotificationSettings; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; +import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate; +import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate; +import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") +@Slf4j +public class AwsSqsTransportQueueProvider implements TbTransportQueueProvider { + private final TbQueueTransportApiSettings transportApiSettings; + private final TbQueueTransportNotificationSettings transportNotificationSettings; + private final TbAwsSqsSettings sqsSettings; + private final TbQueueAdmin admin; + private final TbServiceInfoProvider serviceInfoProvider; + + public AwsSqsTransportQueueProvider(TbQueueTransportApiSettings transportApiSettings, + TbQueueTransportNotificationSettings transportNotificationSettings, + TbAwsSqsSettings sqsSettings, + TbServiceInfoProvider serviceInfoProvider) { + this.transportApiSettings = transportApiSettings; + this.transportNotificationSettings = transportNotificationSettings; + this.sqsSettings = sqsSettings; + admin = new TbAwsSqsAdmin(sqsSettings); + this.serviceInfoProvider = serviceInfoProvider; + } + + @Override + public TbQueueRequestTemplate, TbProtoQueueMsg> getTransportApiRequestTemplate() { + TbAwsSqsProducerTemplate> producerTemplate = + new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic()); + + TbAwsSqsConsumerTemplate> consumerTemplate = + new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, + transportApiSettings.getResponsesTopic() + "_" + serviceInfoProvider.getServiceId(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder(); + templateBuilder.queueAdmin(admin); + templateBuilder.requestTemplate(producerTemplate); + templateBuilder.responseTemplate(consumerTemplate); + templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); + templateBuilder.maxRequestTimeout(transportApiSettings.getMaxRequestsTimeout()); + templateBuilder.pollInterval(transportApiSettings.getResponsePollInterval()); + return templateBuilder.build(); + } + + @Override + public TbQueueProducer> getRuleEngineMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic()); + } + + @Override + public TbQueueProducer> getTbCoreMsgProducer() { + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic()); + } + + @Override + public TbQueueConsumer> getTransportNotificationsConsumer() { + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportNotificationSettings.getNotificationsTopic() + "_" + serviceInfoProvider.getServiceId(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/AwsSqsTbQueueMsgMetadata.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/AwsSqsTbQueueMsgMetadata.java new file mode 100644 index 0000000000..eb4eaed7ed --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/AwsSqsTbQueueMsgMetadata.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.sqs; + +import com.amazonaws.http.SdkHttpMetadata; +import lombok.AllArgsConstructor; +import lombok.Data; +import org.thingsboard.server.queue.TbQueueMsgMetadata; + +@Data +@AllArgsConstructor +public class AwsSqsTbQueueMsgMetadata implements TbQueueMsgMetadata { + + private final SdkHttpMetadata metadata; +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java new file mode 100644 index 0000000000..6080baec26 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java @@ -0,0 +1,65 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.sqs; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.CreateQueueRequest; +import com.amazonaws.services.sqs.model.QueueAttributeName; +import org.thingsboard.server.queue.TbQueueAdmin; + +import java.util.HashMap; +import java.util.Map; + +public class TbAwsSqsAdmin implements TbQueueAdmin { + + private final TbAwsSqsSettings sqsSettings; + private final Map attributes = new HashMap<>(); + private final AWSStaticCredentialsProvider credProvider; + + public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings) { + this.sqsSettings = sqsSettings; + + AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); + this.credProvider = new AWSStaticCredentialsProvider(awsCredentials); + + attributes.put("FifoQueue", "true"); + attributes.put("ContentBasedDeduplication", "true"); + attributes.put(QueueAttributeName.VisibilityTimeout.toString(), sqsSettings.getVisibilityTimeout()); + } + + @Override + public void createTopicIfNotExists(String topic) { + AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() + .withCredentials(credProvider) + .withRegion(sqsSettings.getRegion()) + .build(); + + final CreateQueueRequest createQueueRequest = + new CreateQueueRequest(topic.replaceAll("\\.", "_") + ".fifo") + .withAttributes(attributes); + try { + sqsClient.createQueue(createQueueRequest); + } finally { + if (sqsClient != null) { + sqsClient.shutdown(); + } + } + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java new file mode 100644 index 0000000000..307e05bcec --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java @@ -0,0 +1,239 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.sqs; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.google.common.reflect.TypeToken; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.gson.Gson; +import com.google.protobuf.InvalidProtocolBufferException; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.TbQueueMsgHeaders; +import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +public class TbAwsSqsConsumerTemplate implements TbQueueConsumer { + + private static final int MAX_NUM_MSGS = 10; + + private final Gson gson = new Gson(); + private final TbQueueAdmin admin; + private final AmazonSQS sqsClient; + private final String topic; + private final TbQueueMsgDecoder decoder; + private final TbAwsSqsSettings sqsSettings; + + private final List pendingMessages = new CopyOnWriteArrayList<>(); + private volatile Set queueUrls; + private volatile Set partitions; + private ListeningExecutorService consumerExecutor; + private volatile boolean subscribed; + private volatile boolean stopped = false; + + public TbAwsSqsConsumerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String topic, TbQueueMsgDecoder decoder) { + this.admin = admin; + this.decoder = decoder; + this.topic = topic; + this.sqsSettings = sqsSettings; + + AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); + AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials); + + this.sqsClient = AmazonSQSClientBuilder.standard() + .withCredentials(credProvider) + .withRegion(sqsSettings.getRegion()) + .build(); + } + + @Override + public String getTopic() { + return topic; + } + + @Override + public void subscribe() { + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); + subscribed = false; + } + + @Override + public void subscribe(Set partitions) { + this.partitions = partitions; + subscribed = false; + } + + @Override + public void unsubscribe() { + stopped = true; + + if (sqsClient != null) { + sqsClient.shutdown(); + } + if (consumerExecutor != null) { + consumerExecutor.shutdownNow(); + } + } + + @Override + public List poll(long durationInMillis) { + if (!subscribed && partitions == null) { + try { + Thread.sleep(durationInMillis); + } catch (InterruptedException e) { + log.debug("Failed to await subscription", e); + } + } else { + if (!subscribed) { + List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet()); + consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1)); + subscribed = true; + } + + if (!pendingMessages.isEmpty()) { + log.warn("Present {} non committed messages.", pendingMessages.size()); + return Collections.emptyList(); + } + + List>> futureList = queueUrls + .stream() + .map(url -> poll(url, (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis))) + .collect(Collectors.toList()); + ListenableFuture>> futureResult = Futures.allAsList(futureList); + try { + return futureResult.get().stream() + .flatMap(List::stream) + .map(msg -> { + try { + return decode(msg); + } catch (IOException e) { + log.error("Failed to decode message: [{}]", msg); + return null; + } + }).filter(Objects::nonNull) + .collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + if (stopped) { + log.info("[{}] Aws SQS consumer is stopped.", topic); + } else { + log.error("Failed to pool messages.", e); + } + } + } + return Collections.emptyList(); + } + + private ListenableFuture> poll(String url, int waitTimeSeconds) { + List>> result = new ArrayList<>(); + + for (int i = 0; i < sqsSettings.getThreadsPerTopic(); i++) { + result.add(consumerExecutor.submit(() -> { + ReceiveMessageRequest request = new ReceiveMessageRequest(); + request + .withWaitTimeSeconds(waitTimeSeconds) + .withMessageAttributeNames("headers") + .withQueueUrl(url) + .withMaxNumberOfMessages(MAX_NUM_MSGS); + return sqsClient.receiveMessage(request).getMessages(); + })); + } + return Futures.transform(Futures.allAsList(result), list -> { + if (!CollectionUtils.isEmpty(list)) { + return list.stream() + .flatMap(messageList -> { + if (!messageList.isEmpty()) { + this.pendingMessages.add(new AwsSqsMsgWrapper(url, messageList)); + return messageList.stream(); + } + return Stream.empty(); + }) + .collect(Collectors.toList()); + } + return Collections.emptyList(); + }, consumerExecutor); + } + + @Override + public void commit() { + pendingMessages.forEach(msg -> + consumerExecutor.submit(() -> { + List entries = msg.getMessages() + .stream() + .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle())) + .collect(Collectors.toList()); + sqsClient.deleteMessageBatch(msg.getUrl(), entries); + })); + + pendingMessages.clear(); + } + + public T decode(Message message) throws InvalidProtocolBufferException { + TbAwsSqsMsg msg = gson.fromJson(message.getBody(), TbAwsSqsMsg.class); + TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders(); + Map headerMap = gson.fromJson(message.getMessageAttributes().get("headers").getStringValue(), new TypeToken>() { + }.getType()); + headerMap.forEach(headers::put); + msg.setHeaders(headers); + return decoder.decode(msg); + } + + @Data + private static class AwsSqsMsgWrapper { + private final String url; + private final List messages; + + public AwsSqsMsgWrapper(String url, List messages) { + this.url = url; + this.messages = messages; + } + } + + private String getQueueUrl(String topic) { + admin.createTopicIfNotExists(topic); + return sqsClient.getQueueUrl(topic.replaceAll("\\.", "_") + ".fifo").getQueueUrl(); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsMsg.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsMsg.java new file mode 100644 index 0000000000..4df7558491 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsMsg.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.sqs; + +import com.google.gson.annotations.Expose; +import lombok.Data; +import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.TbQueueMsgHeaders; + +import java.util.UUID; + +@Data +public class TbAwsSqsMsg implements TbQueueMsg { + private final UUID key; + private final byte[] data; + + public TbAwsSqsMsg(UUID key, byte[] data) { + this.key = key; + this.data = data; + } + + @Expose(serialize = false, deserialize = false) + private TbQueueMsgHeaders headers; + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java new file mode 100644 index 0000000000..6514d12747 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java @@ -0,0 +1,127 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.sqs; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.gson.Gson; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.TbQueueProducer; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; + +@Slf4j +public class TbAwsSqsProducerTemplate implements TbQueueProducer { + private final String defaultTopic; + private final AmazonSQS sqsClient; + private final Gson gson = new Gson(); + private final Map queueUrlMap = new ConcurrentHashMap<>(); + private final TbQueueAdmin admin; + private ListeningExecutorService producerExecutor; + + public TbAwsSqsProducerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String defaultTopic) { + this.admin = admin; + this.defaultTopic = defaultTopic; + + AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); + AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials); + + this.sqsClient = AmazonSQSClientBuilder.standard() + .withCredentials(credProvider) + .withRegion(sqsSettings.getRegion()) + .build(); + + producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + } + + @Override + public void init() { + + } + + @Override + public String getDefaultTopic() { + return defaultTopic; + } + + @Override + public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { + SendMessageRequest sendMsgRequest = new SendMessageRequest(); + sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName())); + sendMsgRequest.withMessageBody(gson.toJson(new TbAwsSqsMsg(msg.getKey(), msg.getData()))); + + Map attributes = new HashMap<>(); + + attributes.put("headers", new MessageAttributeValue() + .withStringValue(gson.toJson(msg.getHeaders().getData())) + .withDataType("String")); + + sendMsgRequest.withMessageAttributes(attributes); + sendMsgRequest.withMessageGroupId(msg.getKey().toString()); + ListenableFuture future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); + + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(SendMessageResult result) { + if (callback != null) { + callback.onSuccess(new AwsSqsTbQueueMsgMetadata(result.getSdkHttpMetadata())); + } + } + + @Override + public void onFailure(Throwable t) { + if (callback != null) { + callback.onFailure(t); + } + } + }); + } + + @Override + public void stop() { + if (producerExecutor != null) { + producerExecutor.shutdownNow(); + } + if (sqsClient != null) { + sqsClient.shutdown(); + } + } + + private String getQueueUrl(String topic) { + return queueUrlMap.computeIfAbsent(topic, k -> { + admin.createTopicIfNotExists(topic); + return sqsClient.getQueueUrl(topic.replaceAll("\\.", "_") + ".fifo").getQueueUrl(); + }); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java new file mode 100644 index 0000000000..42a2f1e983 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java @@ -0,0 +1,44 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.sqs; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; + +@Slf4j +@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs'") +@Component +@Data +public class TbAwsSqsSettings { + + @Value("${queue.aws_sqs.access_key_id}") + private String accessKeyId; + + @Value("${queue.aws_sqs.secret_access_key}") + private String secretAccessKey; + + @Value("${queue.aws_sqs.region}") + private String region; + + @Value("${queue.aws_sqs.threads_per_topic}") + private int threadsPerTopic; + + @Value("${queue.aws_sqs.visibility_timeout}") + private String visibilityTimeout; +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 3891b197a6..3a707d11a7 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -138,6 +138,9 @@ public class DefaultTransportService implements TransportService { while (!stopped) { try { List> records = transportNotificationsConsumer.poll(notificationsPollDuration); + if (records.size() == 0) { + continue; + } records.forEach(record -> { try { ToTransportMsg toTransportMsg = record.getValue(); @@ -170,6 +173,10 @@ public class DefaultTransportService implements TransportService { perDeviceLimits.clear(); } stopped = true; + + if (transportNotificationsConsumer != null) { + transportNotificationsConsumer.unsubscribe(); + } if (schedulerExecutor != null) { schedulerExecutor.shutdownNow(); } diff --git a/pom.xml b/pom.xml index 547c97448f..9863982929 100755 --- a/pom.xml +++ b/pom.xml @@ -92,6 +92,7 @@ 2.57 2.7.7 1.23 + 1.11.747 1.5.0 1.4.3 @@ -886,6 +887,11 @@ jts-core ${jts.version} + + com.amazonaws + aws-java-sdk-sqs + ${amazonaws.sqs.version} + org.passay passay diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml index 373e4107f8..bf629a6f10 100644 --- a/rule-engine/rule-engine-components/pom.xml +++ b/rule-engine/rule-engine-components/pom.xml @@ -35,7 +35,7 @@ UTF-8 ${basedir}/../.. - 1.11.323 + 1.11.747 1.83.0 1.16.0 diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index f4a31449e7..6ac9b08777 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -63,7 +63,7 @@ transport: max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" queue: - type: "${TB_QUEUE_TYPE:kafka}" # kafka or ? + type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs kafka: bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" acks: "${TB_KAFKA_ACKS:all}" @@ -71,6 +71,10 @@ queue: batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" linger.ms: "${TB_KAFKA_LINGER_MS:1}" buffer.memory: "${TB_BUFFER_MEMORY:33554432}" + aws_sqs: + access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" + secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" + region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" partitions: hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"