From 45bd764e6fec0f994f5345a20653c185f5b190ad Mon Sep 17 00:00:00 2001
From: Yevhen Bondarenko <56396344+YevhenBondarenko@users.noreply.github.com>
Date: Tue, 7 Apr 2020 11:45:28 +0300
Subject: [PATCH] Azure ServiceBus queue
* created event hubs queue
* created servicebus queue
* refactored
* refactored
---
.../actors/ruleChain/DefaultTbContext.java | 2 +-
.../RuleChainActorMessageProcessor.java | 2 +-
.../src/main/resources/thingsboard.yml | 7 +-
.../thingsboard/server/common/msg/TbMsg.java | 2 +-
common/queue/pom.xml | 4 +
.../azure/servicebus/TbServiceBusAdmin.java | 79 +++++++
.../TbServiceBusConsumerTemplate.java | 210 ++++++++++++++++++
.../TbServiceBusProducerTemplate.java | 110 +++++++++
.../servicebus/TbServiceBusSettings.java | 37 +++
.../queue/common/DefaultTbQueueMsg.java | 13 +-
.../ConsistentHashPartitionService.java | 1 -
.../provider/AwsSqsMonolithQueueFactory.java | 8 +-
.../provider/AwsSqsTbCoreQueueFactory.java | 22 +-
.../AwsSqsTbRuleEngineQueueFactory.java | 7 +-
.../provider/AwsSqsTransportQueueFactory.java | 28 ++-
.../InMemoryMonolithQueueFactory.java | 8 +-
.../InMemoryTbTransportQueueFactory.java | 18 +-
.../provider/KafkaMonolithQueueFactory.java | 8 +-
.../provider/KafkaTbCoreQueueFactory.java | 6 +-
.../KafkaTbRuleEngineQueueFactory.java | 4 +-
.../provider/PubSubMonolithQueueFactory.java | 10 +-
.../ServiceBusMonolithQueueFactory.java | 134 +++++++++++
.../ServiceBusTbCoreQueueProvider.java | 116 ++++++++++
.../ServiceBusTbRuleEngineQueueFactory.java | 99 +++++++++
.../ServiceBusTransportQueueFactory.java | 94 ++++++++
.../pubsub/TbPubSubConsumerTemplate.java | 11 +-
.../pubsub/TbPubSubProducerTemplate.java | 3 +-
.../queue/sqs/TbAwsSqsConsumerTemplate.java | 11 +-
.../queue/sqs/TbAwsSqsProducerTemplate.java | 9 -
pom.xml | 6 +
.../src/main/resources/tb-mqtt-transport.yml | 7 +-
31 files changed, 978 insertions(+), 98 deletions(-)
create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java
create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java
create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusProducerTemplate.java
create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusSettings.java
create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java
create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueProvider.java
create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java
create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 7178b94dfa..81444769d0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 3ab67bf81f..3a03cdadde 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 04c9205a12..cb562a7f52 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -517,7 +517,7 @@ swagger:
version: "${SWAGGER_VERSION:2.0}"
queue:
- type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory or aws-sqs or pubsub
+ type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory or aws-sqs or pubsub or service-bus
kafka:
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
acks: "${TB_KAFKA_ACKS:all}"
@@ -537,6 +537,11 @@ queue:
ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
+ service_bus:
+ namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}"
+ sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
+ sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}"
+ max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}"
partitions:
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
index 7230804773..10c372d546 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/common/queue/pom.xml b/common/queue/pom.xml
index 0a61eb0cf6..f55f69ece1 100644
--- a/common/queue/pom.xml
+++ b/common/queue/pom.xml
@@ -60,6 +60,10 @@
com.google.cloud
google-cloud-pubsub
+
+ com.microsoft.azure
+ azure-servicebus
+
org.springframework
spring-context-support
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java
new file mode 100644
index 0000000000..fb9cd89a8d
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.queue.azure.servicebus;
+
+import com.microsoft.azure.servicebus.management.ManagementClient;
+import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
+import com.microsoft.azure.servicebus.primitives.ServiceBusException;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.queue.TbQueueAdmin;
+
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+@Component
+@ConditionalOnExpression("'${queue.type:null}'=='service-bus'")
+public class TbServiceBusAdmin implements TbQueueAdmin {
+
+ private final Set queues = ConcurrentHashMap.newKeySet();
+
+ private final ManagementClient client;
+
+ public TbServiceBusAdmin(TbServiceBusSettings serviceBusSettings) {
+ ConnectionStringBuilder builder = new ConnectionStringBuilder(
+ serviceBusSettings.getNamespaceName(),
+ "queues",
+ serviceBusSettings.getSasKeyName(),
+ serviceBusSettings.getSasKey());
+
+ client = new ManagementClient(builder);
+
+ try {
+ client.getQueues().forEach(queueDescription -> queues.add(queueDescription.getPath()));
+ } catch (ServiceBusException | InterruptedException e) {
+ log.error("Failed to get queues.", e);
+ throw new RuntimeException("Failed to get queues.", e);
+ }
+ }
+
+ @Override
+ public void createTopicIfNotExists(String topic) {
+ if (queues.contains(topic)) {
+ return;
+ }
+
+ try {
+ client.createQueue(topic);
+ queues.add(topic);
+ } catch (ServiceBusException | InterruptedException e) {
+ log.error("Failed to create queue: [{}]", topic, e);
+ }
+ }
+
+ @PreDestroy
+ private void destroy() {
+ try {
+ client.close();
+ } catch (IOException e) {
+ log.error("Failed to close ManagementClient.");
+ }
+ }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java
new file mode 100644
index 0000000000..cca599d59a
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java
@@ -0,0 +1,210 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.queue.azure.servicebus;
+
+import com.google.gson.Gson;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.microsoft.azure.servicebus.TransactionContext;
+import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
+import com.microsoft.azure.servicebus.primitives.CoreMessageReceiver;
+import com.microsoft.azure.servicebus.primitives.MessageWithDeliveryTag;
+import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
+import com.microsoft.azure.servicebus.primitives.MessagingFactory;
+import com.microsoft.azure.servicebus.primitives.SettleModePair;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.springframework.util.CollectionUtils;
+import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
+import org.thingsboard.server.queue.TbQueueAdmin;
+import org.thingsboard.server.queue.TbQueueConsumer;
+import org.thingsboard.server.queue.TbQueueMsg;
+import org.thingsboard.server.queue.TbQueueMsgDecoder;
+import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+public class TbServiceBusConsumerTemplate implements TbQueueConsumer {
+ private final TbQueueAdmin admin;
+ private final String topic;
+ private final TbQueueMsgDecoder decoder;
+ private final TbServiceBusSettings serviceBusSettings;
+
+ private final Gson gson = new Gson();
+
+ private Set receivers;
+ private volatile Set partitions;
+ private volatile boolean subscribed;
+ private volatile boolean stopped = false;
+ private Map> pendingMessages = new ConcurrentHashMap<>();
+ private volatile int messagesPerQueue;
+
+ public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder decoder) {
+ this.admin = admin;
+ this.decoder = decoder;
+ this.topic = topic;
+ this.serviceBusSettings = serviceBusSettings;
+ }
+
+ @Override
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public void subscribe() {
+ partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
+ subscribed = false;
+ }
+
+ @Override
+ public void subscribe(Set partitions) {
+ this.partitions = partitions;
+ subscribed = false;
+ }
+
+ @Override
+ public void unsubscribe() {
+ stopped = true;
+ receivers.forEach(CoreMessageReceiver::closeAsync);
+ }
+
+ @Override
+ public List poll(long durationInMillis) {
+ if (!subscribed && partitions == null) {
+ try {
+ Thread.sleep(durationInMillis);
+ } catch (InterruptedException e) {
+ log.debug("Failed to await subscription", e);
+ }
+ } else {
+ if (!subscribed) {
+ createReceivers();
+ messagesPerQueue = receivers.size() / partitions.size();
+ subscribed = true;
+ }
+
+ List>> messageFutures =
+ receivers.stream()
+ .map(receiver -> receiver
+ .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis))
+ .whenComplete((messages, err) -> {
+ if (!CollectionUtils.isEmpty(messages)) {
+ pendingMessages.put(receiver, messages);
+ } else if (err != null) {
+ log.error("Failed to receive messages.", err);
+ }
+ }))
+ .collect(Collectors.toList());
+ try {
+ return fromList(messageFutures)
+ .get()
+ .stream()
+ .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream())
+ .map(message -> {
+ try {
+ return decode(message);
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Failed to parse message.", e);
+ throw new RuntimeException("Failed to parse message.", e);
+ }
+ }).collect(Collectors.toList());
+ } catch (InterruptedException | ExecutionException e) {
+ if (stopped) {
+ log.info("[{}] Service Bus consumer is stopped.", topic);
+ } else {
+ log.error("Failed to receive messages", e);
+ }
+ }
+ }
+ return Collections.emptyList();
+ }
+
+ private void createReceivers() {
+ List> receiverFutures = partitions.stream()
+ .map(TopicPartitionInfo::getFullTopicName)
+ .map(queue -> {
+ MessagingFactory factory;
+ try {
+ factory = MessagingFactory.createFromConnectionStringBuilder(createConnection(queue));
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Failed to create factory for the queue [{}]", queue);
+ throw new RuntimeException("Failed to create the factory", e);
+ }
+
+ return CoreMessageReceiver.create(factory, queue, queue, 0,
+ new SettleModePair(SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND),
+ MessagingEntityType.QUEUE);
+ }).collect(Collectors.toList());
+
+ try {
+ receivers = new HashSet<>(fromList(receiverFutures).get());
+ } catch (InterruptedException | ExecutionException e) {
+ if (stopped) {
+ log.info("[{}] Service Bus consumer is stopped.", topic);
+ } else {
+ log.error("Failed to create receivers", e);
+ }
+ }
+ }
+
+ private ConnectionStringBuilder createConnection(String queue) {
+ admin.createTopicIfNotExists(queue);
+ return new ConnectionStringBuilder(
+ serviceBusSettings.getNamespaceName(),
+ queue,
+ serviceBusSettings.getSasKeyName(),
+ serviceBusSettings.getSasKey());
+ }
+
+ private CompletableFuture> fromList(List> futures) {
+ CompletableFuture>[] arrayFuture = new CompletableFuture[futures.size()];
+ futures.toArray(arrayFuture);
+
+ return CompletableFuture
+ .allOf(arrayFuture)
+ .thenApply(v -> futures
+ .stream()
+ .map(CompletableFuture::join)
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public void commit() {
+ pendingMessages.forEach((receiver, msgs) ->
+ msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN)));
+ pendingMessages.clear();
+ }
+
+ private T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException {
+ DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class);
+ return decoder.decode(msg);
+ }
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusProducerTemplate.java
new file mode 100644
index 0000000000..5d9a931378
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusProducerTemplate.java
@@ -0,0 +1,110 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.queue.azure.servicebus;
+
+import com.google.gson.Gson;
+import com.microsoft.azure.servicebus.IMessage;
+import com.microsoft.azure.servicebus.Message;
+import com.microsoft.azure.servicebus.QueueClient;
+import com.microsoft.azure.servicebus.ReceiveMode;
+import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
+import com.microsoft.azure.servicebus.primitives.ServiceBusException;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
+import org.thingsboard.server.queue.TbQueueAdmin;
+import org.thingsboard.server.queue.TbQueueCallback;
+import org.thingsboard.server.queue.TbQueueMsg;
+import org.thingsboard.server.queue.TbQueueProducer;
+import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Slf4j
+public class TbServiceBusProducerTemplate implements TbQueueProducer {
+ private final String defaultTopic;
+ private final Gson gson = new Gson();
+ private final TbQueueAdmin admin;
+ private final TbServiceBusSettings serviceBusSettings;
+ private final Map clients = new HashMap<>();
+ private ExecutorService executorService;
+
+ public TbServiceBusProducerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String defaultTopic) {
+ this.admin = admin;
+ this.defaultTopic = defaultTopic;
+ this.serviceBusSettings = serviceBusSettings;
+ executorService = Executors.newSingleThreadExecutor();
+ }
+
+ @Override
+ public void init() {
+
+ }
+
+ @Override
+ public String getDefaultTopic() {
+ return defaultTopic;
+ }
+
+ @Override
+ public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
+ IMessage message = new Message(gson.toJson(new DefaultTbQueueMsg(msg)));
+ CompletableFuture future = getClient(tpi.getFullTopicName()).sendAsync(message);
+ future.whenCompleteAsync((success, err) -> {
+ if (err != null) {
+ callback.onFailure(err);
+ } else {
+ callback.onSuccess(null);
+ }
+ }, executorService);
+ }
+
+ @Override
+ public void stop() {
+ clients.forEach((t, client) -> {
+ try {
+ client.close();
+ } catch (ServiceBusException e) {
+ log.error("Failed to close QueueClient.", e);
+ }
+ });
+
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+ }
+
+ private QueueClient getClient(String topic) {
+ return clients.computeIfAbsent(topic, k -> {
+ admin.createTopicIfNotExists(topic);
+ ConnectionStringBuilder builder =
+ new ConnectionStringBuilder(
+ serviceBusSettings.getNamespaceName(),
+ topic,
+ serviceBusSettings.getSasKeyName(),
+ serviceBusSettings.getSasKey());
+ try {
+ return new QueueClient(builder, ReceiveMode.PEEKLOCK);
+ } catch (InterruptedException | ServiceBusException e) {
+ log.error("Failed to create new client for the Queue: [{}]", topic, e);
+ throw new RuntimeException("Failed to create new client for the Queue", e);
+ }
+ });
+ }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusSettings.java
new file mode 100644
index 0000000000..d872dcec0b
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusSettings.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.queue.azure.servicebus;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@ConditionalOnExpression("'${queue.type:null}'=='service-bus'")
+@Component
+@Data
+public class TbServiceBusSettings {
+ @Value("${queue.service_bus.namespace_name}")
+ private String namespaceName;
+ @Value("${queue.service_bus.sas_key_name}")
+ private String sasKeyName;
+ @Value("${queue.service_bus.sas_key}")
+ private String sasKey;
+ @Value("${queue.service_bus.max_messages}")
+ private int maxMessages;
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueMsg.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueMsg.java
index 0e816ae59b..c7e439ef7d 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueMsg.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueMsg.java
@@ -15,7 +15,6 @@
*/
package org.thingsboard.server.queue.common;
-import com.google.gson.annotations.Expose;
import lombok.Data;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgHeaders;
@@ -26,12 +25,20 @@ import java.util.UUID;
public class DefaultTbQueueMsg implements TbQueueMsg {
private final UUID key;
private final byte[] data;
+ private DefaultTbQueueMsgHeaders headers;
+
public DefaultTbQueueMsg(UUID key, byte[] data) {
this.key = key;
this.data = data;
}
- @Expose(serialize = false, deserialize = false)
- private TbQueueMsgHeaders headers;
+ public DefaultTbQueueMsg(TbQueueMsg msg) {
+ this.key = msg.getKey();
+ this.data = msg.getData();
+ DefaultTbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
+ msg.getHeaders().getData().forEach(headers::put);
+ this.headers = headers;
+ }
+
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java
index 1337a1cc74..f36ae70dfa 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java
@@ -36,7 +36,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java
index 72d5731139..9f70345301 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java
@@ -21,14 +21,14 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
-import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.TbQueueProducer;
-import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
+import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
+import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java
index 1fba780395..770e7fa65c 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java
@@ -18,21 +18,22 @@ package org.thingsboard.server.queue.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
-import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
-import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.TbQueueProducer;
-import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
+import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
+import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
@@ -48,8 +49,6 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbQueueTransportApiSettings transportApiSettings;
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
-
-
private final TbQueueAdmin admin;
public AwsSqsTbCoreQueueFactory(TbAwsSqsSettings sqsSettings,
@@ -78,7 +77,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
- public TbQueueProducer> createRuleEngineNotificationsMsgProducer() {
+ public TbQueueProducer> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic());
}
@@ -88,7 +87,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
- public TbQueueProducer> createTbCoreNotificationsMsgProducer() {
+ public TbQueueProducer> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
}
@@ -99,15 +98,16 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
- public TbQueueConsumer> createToCoreNotificationsMsgConsumer() {
+ public TbQueueConsumer> createToCoreNotificationsMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings,
partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
- msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueConsumer> createTransportApiRequestConsumer() {
- return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java
index 4181a9442d..f87f108453 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java
@@ -24,12 +24,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
-import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.TbQueueProducer;
-import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
+import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
+import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
@@ -86,7 +86,8 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
@Override
public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
- return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java
index 09ca194724..4196b226ee 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTransportQueueFactory.java
@@ -18,16 +18,20 @@ package org.thingsboard.server.queue.provider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
-import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
-import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
+import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
@@ -55,17 +59,17 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
}
@Override
- public TbQueueRequestTemplate, TbProtoQueueMsg> createTransportApiRequestTemplate() {
- TbAwsSqsProducerTemplate> producerTemplate =
+ public TbQueueRequestTemplate, TbProtoQueueMsg> createTransportApiRequestTemplate() {
+ TbAwsSqsProducerTemplate> producerTemplate =
new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic());
- TbAwsSqsConsumerTemplate> consumerTemplate =
+ TbAwsSqsConsumerTemplate> consumerTemplate =
new TbAwsSqsConsumerTemplate<>(admin, sqsSettings,
transportApiSettings.getResponsesTopic() + "_" + serviceInfoProvider.getServiceId(),
- msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
- , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder();
+ , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder();
templateBuilder.queueAdmin(admin);
templateBuilder.requestTemplate(producerTemplate);
templateBuilder.responseTemplate(consumerTemplate);
@@ -76,18 +80,18 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
}
@Override
- public TbQueueProducer> createRuleEngineMsgProducer() {
+ public TbQueueProducer> createRuleEngineMsgProducer() {
return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic());
}
@Override
- public TbQueueProducer> createTbCoreMsgProducer() {
+ public TbQueueProducer> createTbCoreMsgProducer() {
return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic());
}
@Override
- public TbQueueConsumer> createTransportNotificationsConsumer() {
+ public TbQueueConsumer> createTransportNotificationsConsumer() {
return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportNotificationSettings.getNotificationsTopic() + "_" + serviceInfoProvider.getServiceId(),
- msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java
index b40876d661..a23c93e015 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java
@@ -26,14 +26,14 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
-import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.TbQueueProducer;
-import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
+import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
+import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
@Slf4j
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java
index 9914cf65f9..c779127e1f 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java
@@ -19,22 +19,22 @@ import com.google.common.util.concurrent.Futures;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
-import org.thingsboard.server.queue.TbQueueConsumer;
-import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
-import org.thingsboard.server.queue.TbQueueProducer;
-import org.thingsboard.server.queue.TbQueueRequestTemplate;
-import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
-import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
-import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+import org.thingsboard.server.queue.TbQueueConsumer;
+import org.thingsboard.server.queue.TbQueueProducer;
+import org.thingsboard.server.queue.TbQueueRequestTemplate;
+import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
+import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
+import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
+import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='in-memory' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')")
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
index 8b84d08bff..fa36f8284b 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
@@ -26,17 +26,17 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
-import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.TbQueueProducer;
-import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
+import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
+import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
@Component
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java
index 7083553abb..5e46d67eed 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java
@@ -26,16 +26,16 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
-import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.TbQueueProducer;
-import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
+import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
+import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && '${service.type:null}'=='tb-core'")
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java
index 9b8a9c4ba0..eef10f35dc 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java
@@ -24,15 +24,15 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
-import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.TbQueueProducer;
-import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
+import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
+import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
@Component
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java
index 4aa35e5d68..f00bd7caef 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java
@@ -27,11 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
-import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.TbQueueProducer;
-import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
-import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
@@ -39,6 +35,10 @@ import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
+import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
+import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
@Component
@@ -54,8 +54,6 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
- private TbQueueProducer> tbCoreProducer;
-
public PubSubMonolithQueueFactory(TbPubSubSettings pubSubSettings,
TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java
new file mode 100644
index 0000000000..914e200fc9
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java
@@ -0,0 +1,134 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.queue.provider;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.msg.queue.ServiceType;
+import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+import org.thingsboard.server.queue.TbQueueAdmin;
+import org.thingsboard.server.queue.TbQueueConsumer;
+import org.thingsboard.server.queue.TbQueueProducer;
+import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
+import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
+import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
+import org.thingsboard.server.queue.common.TbProtoQueueMsg;
+import org.thingsboard.server.queue.discovery.PartitionService;
+import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
+import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
+import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
+import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
+
+@Component
+@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'")
+public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory {
+
+ private final PartitionService partitionService;
+ private final TbQueueCoreSettings coreSettings;
+ private final TbServiceInfoProvider serviceInfoProvider;
+ private final TbQueueRuleEngineSettings ruleEngineSettings;
+ private final TbQueueTransportApiSettings transportApiSettings;
+ private final TbQueueTransportNotificationSettings transportNotificationSettings;
+ private final TbServiceBusSettings serviceBusSettings;
+ private final TbQueueAdmin admin;
+
+ public ServiceBusMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
+ TbQueueRuleEngineSettings ruleEngineSettings,
+ TbServiceInfoProvider serviceInfoProvider,
+ TbQueueTransportApiSettings transportApiSettings,
+ TbQueueTransportNotificationSettings transportNotificationSettings,
+ TbServiceBusSettings serviceBusSettings,
+ TbQueueAdmin admin) {
+ this.partitionService = partitionService;
+ this.coreSettings = coreSettings;
+ this.serviceInfoProvider = serviceInfoProvider;
+ this.ruleEngineSettings = ruleEngineSettings;
+ this.transportApiSettings = transportApiSettings;
+ this.transportNotificationSettings = transportNotificationSettings;
+ this.serviceBusSettings = serviceBusSettings;
+ this.admin = admin;
+ }
+
+ @Override
+ public TbQueueProducer> createTransportNotificationsMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createRuleEngineMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createRuleEngineNotificationsMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createTbCoreMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createTbCoreNotificationsMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
+ return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ }
+
+ @Override
+ public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() {
+ return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings,
+ partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ }
+
+ @Override
+ public TbQueueConsumer> createToCoreMsgConsumer() {
+ return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ }
+
+ @Override
+ public TbQueueConsumer> createToCoreNotificationsMsgConsumer() {
+ return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings,
+ partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ }
+
+ @Override
+ public TbQueueConsumer> createTransportApiRequestConsumer() {
+ return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ }
+
+ @Override
+ public TbQueueProducer> createTransportApiResponseProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getResponsesTopic());
+ }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueProvider.java
new file mode 100644
index 0000000000..334af58133
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueProvider.java
@@ -0,0 +1,116 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.queue.provider;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.msg.queue.ServiceType;
+import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+import org.thingsboard.server.queue.TbQueueAdmin;
+import org.thingsboard.server.queue.TbQueueConsumer;
+import org.thingsboard.server.queue.TbQueueProducer;
+import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
+import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
+import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
+import org.thingsboard.server.queue.common.TbProtoQueueMsg;
+import org.thingsboard.server.queue.discovery.PartitionService;
+import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
+import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
+import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
+
+@Component
+@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'")
+public class ServiceBusTbCoreQueueProvider implements TbCoreQueueFactory {
+
+ private final TbServiceBusSettings serviceBusSettings;
+ private final TbQueueRuleEngineSettings ruleEngineSettings;
+ private final TbQueueCoreSettings coreSettings;
+ private final TbQueueTransportApiSettings transportApiSettings;
+ private final PartitionService partitionService;
+ private final TbServiceInfoProvider serviceInfoProvider;
+ private final TbQueueAdmin admin;
+
+ public ServiceBusTbCoreQueueProvider(TbServiceBusSettings serviceBusSettings,
+ TbQueueCoreSettings coreSettings,
+ TbQueueTransportApiSettings transportApiSettings,
+ TbQueueRuleEngineSettings ruleEngineSettings,
+ PartitionService partitionService,
+ TbServiceInfoProvider serviceInfoProvider,
+ TbQueueAdmin admin) {
+ this.serviceBusSettings = serviceBusSettings;
+ this.coreSettings = coreSettings;
+ this.transportApiSettings = transportApiSettings;
+ this.ruleEngineSettings = ruleEngineSettings;
+ this.partitionService = partitionService;
+ this.serviceInfoProvider = serviceInfoProvider;
+ this.admin = admin;
+ }
+
+ @Override
+ public TbQueueProducer> createTransportNotificationsMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createRuleEngineMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createRuleEngineNotificationsMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createTbCoreMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createTbCoreNotificationsMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueConsumer> createToCoreMsgConsumer() {
+ return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ }
+
+ @Override
+ public TbQueueConsumer> createToCoreNotificationsMsgConsumer() {
+ return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings,
+ partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ }
+
+ @Override
+ public TbQueueConsumer> createTransportApiRequestConsumer() {
+ return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ }
+
+ @Override
+ public TbQueueProducer> createTransportApiResponseProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
+ }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java
new file mode 100644
index 0000000000..3ceb837606
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java
@@ -0,0 +1,99 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.queue.provider;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.msg.queue.ServiceType;
+import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
+import org.thingsboard.server.queue.TbQueueAdmin;
+import org.thingsboard.server.queue.TbQueueConsumer;
+import org.thingsboard.server.queue.TbQueueProducer;
+import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
+import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
+import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
+import org.thingsboard.server.queue.common.TbProtoQueueMsg;
+import org.thingsboard.server.queue.discovery.PartitionService;
+import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
+import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
+import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
+import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
+
+@Component
+@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-rule-engine'")
+public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
+
+ private final PartitionService partitionService;
+ private final TbQueueCoreSettings coreSettings;
+ private final TbServiceInfoProvider serviceInfoProvider;
+ private final TbQueueRuleEngineSettings ruleEngineSettings;
+ private final TbServiceBusSettings serviceBusSettings;
+ private final TbQueueAdmin admin;
+
+ public ServiceBusTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
+ TbQueueRuleEngineSettings ruleEngineSettings,
+ TbServiceInfoProvider serviceInfoProvider,
+ TbServiceBusSettings serviceBusSettings,
+ TbQueueAdmin admin) {
+ this.partitionService = partitionService;
+ this.coreSettings = coreSettings;
+ this.serviceInfoProvider = serviceInfoProvider;
+ this.ruleEngineSettings = ruleEngineSettings;
+ this.serviceBusSettings = serviceBusSettings;
+ this.admin = admin;
+ }
+
+ @Override
+ public TbQueueProducer> createTransportNotificationsMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createRuleEngineMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createRuleEngineNotificationsMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createTbCoreMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createTbCoreNotificationsMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
+ }
+
+ @Override
+ public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
+ return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ }
+
+ @Override
+ public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() {
+ return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings,
+ partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java
new file mode 100644
index 0000000000..d38a4389a3
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.queue.provider;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.queue.TbQueueAdmin;
+import org.thingsboard.server.queue.TbQueueConsumer;
+import org.thingsboard.server.queue.TbQueueProducer;
+import org.thingsboard.server.queue.TbQueueRequestTemplate;
+import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
+import org.thingsboard.server.queue.common.TbProtoQueueMsg;
+import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
+import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
+import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
+import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
+import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
+
+@Component
+@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')")
+@Slf4j
+public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory {
+ private final TbQueueTransportApiSettings transportApiSettings;
+ private final TbQueueTransportNotificationSettings transportNotificationSettings;
+ private final TbServiceBusSettings serviceBusSettings;
+ private final TbQueueAdmin admin;
+ private final TbServiceInfoProvider serviceInfoProvider;
+
+ public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings,
+ TbQueueTransportNotificationSettings transportNotificationSettings,
+ TbServiceBusSettings serviceBusSettings,
+ TbServiceInfoProvider serviceInfoProvider,
+ TbQueueAdmin admin) {
+ this.transportApiSettings = transportApiSettings;
+ this.transportNotificationSettings = transportNotificationSettings;
+ this.serviceBusSettings = serviceBusSettings;
+ this.admin = admin;
+ this.serviceInfoProvider = serviceInfoProvider;
+ }
+
+ @Override
+ public TbQueueRequestTemplate, TbProtoQueueMsg> createTransportApiRequestTemplate() {
+ TbQueueProducer> producerTemplate =
+ new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic());
+
+ TbQueueConsumer> consumerTemplate =
+ new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings,
+ transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
+
+ DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
+ , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder();
+ templateBuilder.queueAdmin(admin);
+ templateBuilder.requestTemplate(producerTemplate);
+ templateBuilder.responseTemplate(consumerTemplate);
+ templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
+ templateBuilder.maxRequestTimeout(transportApiSettings.getMaxRequestsTimeout());
+ templateBuilder.pollInterval(transportApiSettings.getResponsePollInterval());
+ return templateBuilder.build();
+ }
+
+ @Override
+ public TbQueueProducer> createRuleEngineMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic());
+ }
+
+ @Override
+ public TbQueueProducer> createTbCoreMsgProducer() {
+ return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic());
+ }
+
+ @Override
+ public TbQueueConsumer> createTransportNotificationsConsumer() {
+ return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings,
+ transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(),
+ msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java
index 4109e72070..5dc795739a 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java
@@ -20,7 +20,6 @@ import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
-import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.AcknowledgeRequest;
@@ -36,15 +35,12 @@ import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgDecoder;
-import org.thingsboard.server.queue.TbQueueMsgHeaders;
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
-import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -139,7 +135,7 @@ public class TbPubSubConsumerTemplate implements TbQueueCo
subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet());
subscriptionNames.forEach(admin::createTopicIfNotExists);
consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size());
- messagesPerTopic = pubSubSettings.getMaxMessages()/subscriptionNames.size();
+ messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size();
subscribed = true;
}
List messages;
@@ -217,11 +213,6 @@ public class TbPubSubConsumerTemplate implements TbQueueCo
public T decode(PubsubMessage message) throws InvalidProtocolBufferException {
DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class);
- TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
- Map headerMap = gson.fromJson(message.getAttributesMap().get("headers"), new TypeToken
+
+ com.microsoft.azure
+ azure-servicebus
+ ${azure-servicebus.version}
+
org.passay
passay
diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
index a875d56782..37ac4c14c2 100644
--- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
@@ -63,7 +63,7 @@ transport:
max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}"
queue:
- type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs or pubsub
+ type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs or pubsub or service-bus
kafka:
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
acks: "${TB_KAFKA_ACKS:all}"
@@ -83,6 +83,11 @@ queue:
ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
+ service_bus:
+ namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}"
+ sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
+ sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}"
+ max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}"
partitions:
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"