diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 7178b94dfa..81444769d0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 3ab67bf81f..3a03cdadde 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 04c9205a12..cb562a7f52 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -517,7 +517,7 @@ swagger: version: "${SWAGGER_VERSION:2.0}" queue: - type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory or aws-sqs or pubsub + type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory or aws-sqs or pubsub or service-bus kafka: bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" acks: "${TB_KAFKA_ACKS:all}" @@ -537,6 +537,11 @@ queue: ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + service_bus: + namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}" + sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" + sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" + max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" partitions: hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 7230804773..10c372d546 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/common/queue/pom.xml b/common/queue/pom.xml index 0a61eb0cf6..f55f69ece1 100644 --- a/common/queue/pom.xml +++ b/common/queue/pom.xml @@ -60,6 +60,10 @@ com.google.cloud google-cloud-pubsub + + com.microsoft.azure + azure-servicebus + org.springframework spring-context-support diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java new file mode 100644 index 0000000000..fb9cd89a8d --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java @@ -0,0 +1,79 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.azure.servicebus; + +import com.microsoft.azure.servicebus.management.ManagementClient; +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.queue.TbQueueAdmin; + +import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@Component +@ConditionalOnExpression("'${queue.type:null}'=='service-bus'") +public class TbServiceBusAdmin implements TbQueueAdmin { + + private final Set queues = ConcurrentHashMap.newKeySet(); + + private final ManagementClient client; + + public TbServiceBusAdmin(TbServiceBusSettings serviceBusSettings) { + ConnectionStringBuilder builder = new ConnectionStringBuilder( + serviceBusSettings.getNamespaceName(), + "queues", + serviceBusSettings.getSasKeyName(), + serviceBusSettings.getSasKey()); + + client = new ManagementClient(builder); + + try { + client.getQueues().forEach(queueDescription -> queues.add(queueDescription.getPath())); + } catch (ServiceBusException | InterruptedException e) { + log.error("Failed to get queues.", e); + throw new RuntimeException("Failed to get queues.", e); + } + } + + @Override + public void createTopicIfNotExists(String topic) { + if (queues.contains(topic)) { + return; + } + + try { + client.createQueue(topic); + queues.add(topic); + } catch (ServiceBusException | InterruptedException e) { + log.error("Failed to create queue: [{}]", topic, e); + } + } + + @PreDestroy + private void destroy() { + try { + client.close(); + } catch (IOException e) { + log.error("Failed to close ManagementClient."); + } + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java new file mode 100644 index 0000000000..cca599d59a --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java @@ -0,0 +1,210 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.azure.servicebus; + +import com.google.gson.Gson; +import com.google.protobuf.InvalidProtocolBufferException; +import com.microsoft.azure.servicebus.TransactionContext; +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.CoreMessageReceiver; +import com.microsoft.azure.servicebus.primitives.MessageWithDeliveryTag; +import com.microsoft.azure.servicebus.primitives.MessagingEntityType; +import com.microsoft.azure.servicebus.primitives.MessagingFactory; +import com.microsoft.azure.servicebus.primitives.SettleModePair; +import lombok.extern.slf4j.Slf4j; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.springframework.util.CollectionUtils; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.DefaultTbQueueMsg; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +public class TbServiceBusConsumerTemplate implements TbQueueConsumer { + private final TbQueueAdmin admin; + private final String topic; + private final TbQueueMsgDecoder decoder; + private final TbServiceBusSettings serviceBusSettings; + + private final Gson gson = new Gson(); + + private Set receivers; + private volatile Set partitions; + private volatile boolean subscribed; + private volatile boolean stopped = false; + private Map> pendingMessages = new ConcurrentHashMap<>(); + private volatile int messagesPerQueue; + + public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder decoder) { + this.admin = admin; + this.decoder = decoder; + this.topic = topic; + this.serviceBusSettings = serviceBusSettings; + } + + @Override + public String getTopic() { + return topic; + } + + @Override + public void subscribe() { + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); + subscribed = false; + } + + @Override + public void subscribe(Set partitions) { + this.partitions = partitions; + subscribed = false; + } + + @Override + public void unsubscribe() { + stopped = true; + receivers.forEach(CoreMessageReceiver::closeAsync); + } + + @Override + public List poll(long durationInMillis) { + if (!subscribed && partitions == null) { + try { + Thread.sleep(durationInMillis); + } catch (InterruptedException e) { + log.debug("Failed to await subscription", e); + } + } else { + if (!subscribed) { + createReceivers(); + messagesPerQueue = receivers.size() / partitions.size(); + subscribed = true; + } + + List>> messageFutures = + receivers.stream() + .map(receiver -> receiver + .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis)) + .whenComplete((messages, err) -> { + if (!CollectionUtils.isEmpty(messages)) { + pendingMessages.put(receiver, messages); + } else if (err != null) { + log.error("Failed to receive messages.", err); + } + })) + .collect(Collectors.toList()); + try { + return fromList(messageFutures) + .get() + .stream() + .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream()) + .map(message -> { + try { + return decode(message); + } catch (InvalidProtocolBufferException e) { + log.error("Failed to parse message.", e); + throw new RuntimeException("Failed to parse message.", e); + } + }).collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + if (stopped) { + log.info("[{}] Service Bus consumer is stopped.", topic); + } else { + log.error("Failed to receive messages", e); + } + } + } + return Collections.emptyList(); + } + + private void createReceivers() { + List> receiverFutures = partitions.stream() + .map(TopicPartitionInfo::getFullTopicName) + .map(queue -> { + MessagingFactory factory; + try { + factory = MessagingFactory.createFromConnectionStringBuilder(createConnection(queue)); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to create factory for the queue [{}]", queue); + throw new RuntimeException("Failed to create the factory", e); + } + + return CoreMessageReceiver.create(factory, queue, queue, 0, + new SettleModePair(SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND), + MessagingEntityType.QUEUE); + }).collect(Collectors.toList()); + + try { + receivers = new HashSet<>(fromList(receiverFutures).get()); + } catch (InterruptedException | ExecutionException e) { + if (stopped) { + log.info("[{}] Service Bus consumer is stopped.", topic); + } else { + log.error("Failed to create receivers", e); + } + } + } + + private ConnectionStringBuilder createConnection(String queue) { + admin.createTopicIfNotExists(queue); + return new ConnectionStringBuilder( + serviceBusSettings.getNamespaceName(), + queue, + serviceBusSettings.getSasKeyName(), + serviceBusSettings.getSasKey()); + } + + private CompletableFuture> fromList(List> futures) { + CompletableFuture>[] arrayFuture = new CompletableFuture[futures.size()]; + futures.toArray(arrayFuture); + + return CompletableFuture + .allOf(arrayFuture) + .thenApply(v -> futures + .stream() + .map(CompletableFuture::join) + .collect(Collectors.toList())); + } + + @Override + public void commit() { + pendingMessages.forEach((receiver, msgs) -> + msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN))); + pendingMessages.clear(); + } + + private T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException { + DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class); + return decoder.decode(msg); + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusProducerTemplate.java new file mode 100644 index 0000000000..5d9a931378 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusProducerTemplate.java @@ -0,0 +1,110 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.azure.servicebus; + +import com.google.gson.Gson; +import com.microsoft.azure.servicebus.IMessage; +import com.microsoft.azure.servicebus.Message; +import com.microsoft.azure.servicebus.QueueClient; +import com.microsoft.azure.servicebus.ReceiveMode; +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.DefaultTbQueueMsg; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +public class TbServiceBusProducerTemplate implements TbQueueProducer { + private final String defaultTopic; + private final Gson gson = new Gson(); + private final TbQueueAdmin admin; + private final TbServiceBusSettings serviceBusSettings; + private final Map clients = new HashMap<>(); + private ExecutorService executorService; + + public TbServiceBusProducerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String defaultTopic) { + this.admin = admin; + this.defaultTopic = defaultTopic; + this.serviceBusSettings = serviceBusSettings; + executorService = Executors.newSingleThreadExecutor(); + } + + @Override + public void init() { + + } + + @Override + public String getDefaultTopic() { + return defaultTopic; + } + + @Override + public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { + IMessage message = new Message(gson.toJson(new DefaultTbQueueMsg(msg))); + CompletableFuture future = getClient(tpi.getFullTopicName()).sendAsync(message); + future.whenCompleteAsync((success, err) -> { + if (err != null) { + callback.onFailure(err); + } else { + callback.onSuccess(null); + } + }, executorService); + } + + @Override + public void stop() { + clients.forEach((t, client) -> { + try { + client.close(); + } catch (ServiceBusException e) { + log.error("Failed to close QueueClient.", e); + } + }); + + if (executorService != null) { + executorService.shutdownNow(); + } + } + + private QueueClient getClient(String topic) { + return clients.computeIfAbsent(topic, k -> { + admin.createTopicIfNotExists(topic); + ConnectionStringBuilder builder = + new ConnectionStringBuilder( + serviceBusSettings.getNamespaceName(), + topic, + serviceBusSettings.getSasKeyName(), + serviceBusSettings.getSasKey()); + try { + return new QueueClient(builder, ReceiveMode.PEEKLOCK); + } catch (InterruptedException | ServiceBusException e) { + log.error("Failed to create new client for the Queue: [{}]", topic, e); + throw new RuntimeException("Failed to create new client for the Queue", e); + } + }); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusSettings.java new file mode 100644 index 0000000000..d872dcec0b --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusSettings.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.azure.servicebus; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; + +@Slf4j +@ConditionalOnExpression("'${queue.type:null}'=='service-bus'") +@Component +@Data +public class TbServiceBusSettings { + @Value("${queue.service_bus.namespace_name}") + private String namespaceName; + @Value("${queue.service_bus.sas_key_name}") + private String sasKeyName; + @Value("${queue.service_bus.sas_key}") + private String sasKey; + @Value("${queue.service_bus.max_messages}") + private int maxMessages; +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueMsg.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueMsg.java index 0e816ae59b..c7e439ef7d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueMsg.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueMsg.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.queue.common; -import com.google.gson.annotations.Expose; import lombok.Data; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgHeaders; @@ -26,12 +25,20 @@ import java.util.UUID; public class DefaultTbQueueMsg implements TbQueueMsg { private final UUID key; private final byte[] data; + private DefaultTbQueueMsgHeaders headers; + public DefaultTbQueueMsg(UUID key, byte[] data) { this.key = key; this.data = data; } - @Expose(serialize = false, deserialize = false) - private TbQueueMsgHeaders headers; + public DefaultTbQueueMsg(TbQueueMsg msg) { + this.key = msg.getKey(); + this.data = msg.getData(); + DefaultTbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders(); + msg.getHeaders().getData().forEach(headers::put); + this.headers = headers; + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java index 1337a1cc74..f36ae70dfa 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java @@ -36,7 +36,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java index 72d5731139..9f70345301 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java @@ -21,14 +21,14 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java index 1fba780395..770e7fa65c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java @@ -18,21 +18,22 @@ package org.thingsboard.server.queue.provider; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate; import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate; @@ -48,8 +49,6 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; - - private final TbQueueAdmin admin; public AwsSqsTbCoreQueueFactory(TbAwsSqsSettings sqsSettings, @@ -78,7 +77,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { + public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic()); } @@ -88,7 +87,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueProducer> createTbCoreNotificationsMsgProducer() { + public TbQueueProducer> createTbCoreNotificationsMsgProducer() { return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); } @@ -99,15 +98,16 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createToCoreNotificationsMsgConsumer() { + public TbQueueConsumer> createToCoreNotificationsMsgConsumer() { return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), - msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createTransportApiRequestConsumer() { - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java index 4181a9442d..f87f108453 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java @@ -24,12 +24,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate; @@ -86,7 +86,8 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java index 09ca194724..4196b226ee 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java @@ -18,16 +18,20 @@ package org.thingsboard.server.queue.provider; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate; import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate; @@ -55,17 +59,17 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { } @Override - public TbQueueRequestTemplate, TbProtoQueueMsg> createTransportApiRequestTemplate() { - TbAwsSqsProducerTemplate> producerTemplate = + public TbQueueRequestTemplate, TbProtoQueueMsg> createTransportApiRequestTemplate() { + TbAwsSqsProducerTemplate> producerTemplate = new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic()); - TbAwsSqsConsumerTemplate> consumerTemplate = + TbAwsSqsConsumerTemplate> consumerTemplate = new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getResponsesTopic() + "_" + serviceInfoProvider.getServiceId(), - msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder - , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder(); + , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder(); templateBuilder.queueAdmin(admin); templateBuilder.requestTemplate(producerTemplate); templateBuilder.responseTemplate(consumerTemplate); @@ -76,18 +80,18 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { } @Override - public TbQueueProducer> createRuleEngineMsgProducer() { + public TbQueueProducer> createRuleEngineMsgProducer() { return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic()); } @Override - public TbQueueProducer> createTbCoreMsgProducer() { + public TbQueueProducer> createTbCoreMsgProducer() { return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic()); } @Override - public TbQueueConsumer> createTransportNotificationsConsumer() { + public TbQueueConsumer> createTransportNotificationsConsumer() { return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportNotificationSettings.getNotificationsTopic() + "_" + serviceInfoProvider.getServiceId(), - msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index b40876d661..a23c93e015 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -26,14 +26,14 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueConsumer; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; @Slf4j diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java index 9914cf65f9..c779127e1f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java @@ -19,22 +19,22 @@ import com.google.common.util.concurrent.Futures; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; -import org.thingsboard.server.queue.TbQueueConsumer; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; -import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.TbQueueRequestTemplate; -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; -import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; -import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; @Component @ConditionalOnExpression("'${queue.type:null}'=='in-memory' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 8b84d08bff..fa36f8284b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -26,17 +26,17 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueConsumer; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; @Component diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 7083553abb..5e46d67eed 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -26,16 +26,16 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueConsumer; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; @Component @ConditionalOnExpression("'${queue.type:null}'=='kafka' && '${service.type:null}'=='tb-core'") diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 9b8a9c4ba0..eef10f35dc 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -24,15 +24,15 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.queue.TbQueueConsumer; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; @Component diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java index 4aa35e5d68..f00bd7caef 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java @@ -27,11 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; -import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; @@ -39,6 +35,10 @@ import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; @Component @@ -54,8 +54,6 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; - private TbQueueProducer> tbCoreProducer; - public PubSubMonolithQueueFactory(TbPubSubSettings pubSubSettings, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java new file mode 100644 index 0000000000..914e200fc9 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java @@ -0,0 +1,134 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.provider; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; +import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'") +public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory { + + private final PartitionService partitionService; + private final TbQueueCoreSettings coreSettings; + private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRuleEngineSettings ruleEngineSettings; + private final TbQueueTransportApiSettings transportApiSettings; + private final TbQueueTransportNotificationSettings transportNotificationSettings; + private final TbServiceBusSettings serviceBusSettings; + private final TbQueueAdmin admin; + + public ServiceBusMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, + TbQueueRuleEngineSettings ruleEngineSettings, + TbServiceInfoProvider serviceInfoProvider, + TbQueueTransportApiSettings transportApiSettings, + TbQueueTransportNotificationSettings transportNotificationSettings, + TbServiceBusSettings serviceBusSettings, + TbQueueAdmin admin) { + this.partitionService = partitionService; + this.coreSettings = coreSettings; + this.serviceInfoProvider = serviceInfoProvider; + this.ruleEngineSettings = ruleEngineSettings; + this.transportApiSettings = transportApiSettings; + this.transportNotificationSettings = transportNotificationSettings; + this.serviceBusSettings = serviceBusSettings; + this.admin = admin; + } + + @Override + public TbQueueProducer> createTransportNotificationsMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic()); + } + + @Override + public TbQueueProducer> createRuleEngineMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); + } + + @Override + public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); + } + + @Override + public TbQueueProducer> createTbCoreMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> createTbCoreNotificationsMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { + return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() { + return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, + partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> createToCoreMsgConsumer() { + return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> createToCoreNotificationsMsgConsumer() { + return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, + partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> createTransportApiRequestConsumer() { + return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueProducer> createTransportApiResponseProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getResponsesTopic()); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueProvider.java new file mode 100644 index 0000000000..334af58133 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueProvider.java @@ -0,0 +1,116 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.provider; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'") +public class ServiceBusTbCoreQueueProvider implements TbCoreQueueFactory { + + private final TbServiceBusSettings serviceBusSettings; + private final TbQueueRuleEngineSettings ruleEngineSettings; + private final TbQueueCoreSettings coreSettings; + private final TbQueueTransportApiSettings transportApiSettings; + private final PartitionService partitionService; + private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueAdmin admin; + + public ServiceBusTbCoreQueueProvider(TbServiceBusSettings serviceBusSettings, + TbQueueCoreSettings coreSettings, + TbQueueTransportApiSettings transportApiSettings, + TbQueueRuleEngineSettings ruleEngineSettings, + PartitionService partitionService, + TbServiceInfoProvider serviceInfoProvider, + TbQueueAdmin admin) { + this.serviceBusSettings = serviceBusSettings; + this.coreSettings = coreSettings; + this.transportApiSettings = transportApiSettings; + this.ruleEngineSettings = ruleEngineSettings; + this.partitionService = partitionService; + this.serviceInfoProvider = serviceInfoProvider; + this.admin = admin; + } + + @Override + public TbQueueProducer> createTransportNotificationsMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> createRuleEngineMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); + } + + @Override + public TbQueueProducer> createTbCoreMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> createTbCoreNotificationsMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueConsumer> createToCoreMsgConsumer() { + return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> createToCoreNotificationsMsgConsumer() { + return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, + partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> createTransportApiRequestConsumer() { + return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueProducer> createTransportApiResponseProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java new file mode 100644 index 0000000000..3ceb837606 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java @@ -0,0 +1,99 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.provider; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-rule-engine'") +public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { + + private final PartitionService partitionService; + private final TbQueueCoreSettings coreSettings; + private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRuleEngineSettings ruleEngineSettings; + private final TbServiceBusSettings serviceBusSettings; + private final TbQueueAdmin admin; + + public ServiceBusTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, + TbQueueRuleEngineSettings ruleEngineSettings, + TbServiceInfoProvider serviceInfoProvider, + TbServiceBusSettings serviceBusSettings, + TbQueueAdmin admin) { + this.partitionService = partitionService; + this.coreSettings = coreSettings; + this.serviceInfoProvider = serviceInfoProvider; + this.ruleEngineSettings = ruleEngineSettings; + this.serviceBusSettings = serviceBusSettings; + this.admin = admin; + } + + @Override + public TbQueueProducer> createTransportNotificationsMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> createRuleEngineMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); + } + + @Override + public TbQueueProducer> createTbCoreMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueProducer> createTbCoreNotificationsMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + } + + @Override + public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { + return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() { + return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, + partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java new file mode 100644 index 0000000000..d38a4389a3 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java @@ -0,0 +1,94 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.provider; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") +@Slf4j +public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory { + private final TbQueueTransportApiSettings transportApiSettings; + private final TbQueueTransportNotificationSettings transportNotificationSettings; + private final TbServiceBusSettings serviceBusSettings; + private final TbQueueAdmin admin; + private final TbServiceInfoProvider serviceInfoProvider; + + public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, + TbQueueTransportNotificationSettings transportNotificationSettings, + TbServiceBusSettings serviceBusSettings, + TbServiceInfoProvider serviceInfoProvider, + TbQueueAdmin admin) { + this.transportApiSettings = transportApiSettings; + this.transportNotificationSettings = transportNotificationSettings; + this.serviceBusSettings = serviceBusSettings; + this.admin = admin; + this.serviceInfoProvider = serviceInfoProvider; + } + + @Override + public TbQueueRequestTemplate, TbProtoQueueMsg> createTransportApiRequestTemplate() { + TbQueueProducer> producerTemplate = + new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); + + TbQueueConsumer> consumerTemplate = + new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, + transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder(); + templateBuilder.queueAdmin(admin); + templateBuilder.requestTemplate(producerTemplate); + templateBuilder.responseTemplate(consumerTemplate); + templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); + templateBuilder.maxRequestTimeout(transportApiSettings.getMaxRequestsTimeout()); + templateBuilder.pollInterval(transportApiSettings.getResponsePollInterval()); + return templateBuilder.build(); + } + + @Override + public TbQueueProducer> createRuleEngineMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); + } + + @Override + public TbQueueProducer> createTbCoreMsgProducer() { + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); + } + + @Override + public TbQueueConsumer> createTransportNotificationsConsumer() { + return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, + transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java index 4109e72070..5dc795739a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java @@ -20,7 +20,6 @@ import com.google.api.core.ApiFutures; import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; -import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.protobuf.InvalidProtocolBufferException; import com.google.pubsub.v1.AcknowledgeRequest; @@ -36,15 +35,12 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; -import org.thingsboard.server.queue.TbQueueMsgHeaders; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; -import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; @@ -139,7 +135,7 @@ public class TbPubSubConsumerTemplate implements TbQueueCo subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet()); subscriptionNames.forEach(admin::createTopicIfNotExists); consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size()); - messagesPerTopic = pubSubSettings.getMaxMessages()/subscriptionNames.size(); + messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size(); subscribed = true; } List messages; @@ -217,11 +213,6 @@ public class TbPubSubConsumerTemplate implements TbQueueCo public T decode(PubsubMessage message) throws InvalidProtocolBufferException { DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class); - TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders(); - Map headerMap = gson.fromJson(message.getAttributesMap().get("headers"), new TypeToken>() { - }.getType()); - headerMap.forEach(headers::put); - msg.setHeaders(headers); return decoder.decode(msg); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java index fb9bc8b189..2cd2e1054e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java @@ -71,7 +71,6 @@ public class TbPubSubProducerTemplate implements TbQueuePr public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder(); pubsubMessageBuilder.setData(getMsg(msg)); - pubsubMessageBuilder.putAttributes("headers", gson.toJson(msg.getHeaders().getData())); Publisher publisher = getOrCreatePublisher(tpi.getFullTopicName()); ApiFuture future = publisher.publish(pubsubMessageBuilder.build()); @@ -110,7 +109,7 @@ public class TbPubSubProducerTemplate implements TbQueuePr } private ByteString getMsg(T msg) { - String json = gson.toJson(new DefaultTbQueueMsg(msg.getKey(), msg.getData())); + String json = gson.toJson(new DefaultTbQueueMsg(msg)); return ByteString.copyFrom(json.getBytes()); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java index b4a2f84b83..3e71388844 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java @@ -23,7 +23,6 @@ import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; -import com.google.common.reflect.TypeToken; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -38,15 +37,12 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; -import org.thingsboard.server.queue.TbQueueMsgHeaders; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; -import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; @@ -161,7 +157,7 @@ public class TbAwsSqsConsumerTemplate implements TbQueueCo if (stopped) { log.info("[{}] Aws SQS consumer is stopped.", topic); } else { - log.error("Failed to pool messages.", e); + log.error("Failed to pool messages.", e); } } } @@ -214,11 +210,6 @@ public class TbAwsSqsConsumerTemplate implements TbQueueCo public T decode(Message message) throws InvalidProtocolBufferException { DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class); - TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders(); - Map headerMap = gson.fromJson(message.getMessageAttributes().get("headers").getStringValue(), new TypeToken>() { - }.getType()); - headerMap.forEach(headers::put); - msg.setHeaders(headers); return decoder.decode(msg); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java index a5707c845d..d304953ef4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java @@ -20,7 +20,6 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; import com.google.common.util.concurrent.FutureCallback; @@ -37,7 +36,6 @@ import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -82,13 +80,6 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName())); sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg.getKey(), msg.getData()))); - Map attributes = new HashMap<>(); - - attributes.put("headers", new MessageAttributeValue() - .withStringValue(gson.toJson(msg.getHeaders().getData())) - .withDataType("String")); - - sendMsgRequest.withMessageAttributes(attributes); sendMsgRequest.withMessageGroupId(msg.getKey().toString()); ListenableFuture future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); diff --git a/pom.xml b/pom.xml index e385bff9ce..96f336c3a2 100755 --- a/pom.xml +++ b/pom.xml @@ -94,6 +94,7 @@ 1.23 1.11.747 1.84.0 + 3.2.0 1.5.0 1.4.3 @@ -898,6 +899,11 @@ google-cloud-pubsub ${pubsub.client.version} + + com.microsoft.azure + azure-servicebus + ${azure-servicebus.version} + org.passay passay diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index a875d56782..37ac4c14c2 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -63,7 +63,7 @@ transport: max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" queue: - type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs or pubsub + type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs or pubsub or service-bus kafka: bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" acks: "${TB_KAFKA_ACKS:all}" @@ -83,6 +83,11 @@ queue: ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + service_bus: + namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}" + sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" + sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" + max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" partitions: hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"