Azure ServiceBus queue
* created event hubs queue * created servicebus queue * refactored * refactored
This commit is contained in:
		
							parent
							
								
									ff3fd89ace
								
							
						
					
					
						commit
						45bd764e6f
					
				@ -5,7 +5,7 @@
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 * http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 | 
			
		||||
@ -5,7 +5,7 @@
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 * http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 | 
			
		||||
@ -517,7 +517,7 @@ swagger:
 | 
			
		||||
  version: "${SWAGGER_VERSION:2.0}"
 | 
			
		||||
 | 
			
		||||
queue:
 | 
			
		||||
  type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory or aws-sqs or pubsub
 | 
			
		||||
  type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory or aws-sqs or pubsub or service-bus
 | 
			
		||||
  kafka:
 | 
			
		||||
    bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
 | 
			
		||||
    acks: "${TB_KAFKA_ACKS:all}"
 | 
			
		||||
@ -537,6 +537,11 @@ queue:
 | 
			
		||||
    ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again
 | 
			
		||||
    max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes
 | 
			
		||||
    max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
 | 
			
		||||
  service_bus:
 | 
			
		||||
    namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}"
 | 
			
		||||
    sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
 | 
			
		||||
    sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}"
 | 
			
		||||
    max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}"
 | 
			
		||||
  partitions:
 | 
			
		||||
    hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
 | 
			
		||||
    virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
 | 
			
		||||
 | 
			
		||||
@ -5,7 +5,7 @@
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 * http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 | 
			
		||||
@ -60,6 +60,10 @@
 | 
			
		||||
            <groupId>com.google.cloud</groupId>
 | 
			
		||||
            <artifactId>google-cloud-pubsub</artifactId>
 | 
			
		||||
        </dependency>
 | 
			
		||||
        <dependency>
 | 
			
		||||
            <groupId>com.microsoft.azure</groupId>
 | 
			
		||||
            <artifactId>azure-servicebus</artifactId>
 | 
			
		||||
        </dependency>
 | 
			
		||||
        <dependency>
 | 
			
		||||
            <groupId>org.springframework</groupId>
 | 
			
		||||
            <artifactId>spring-context-support</artifactId>
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,79 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.azure.servicebus;
 | 
			
		||||
 | 
			
		||||
import com.microsoft.azure.servicebus.management.ManagementClient;
 | 
			
		||||
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
 | 
			
		||||
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PreDestroy;
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@Component
 | 
			
		||||
@ConditionalOnExpression("'${queue.type:null}'=='service-bus'")
 | 
			
		||||
public class TbServiceBusAdmin implements TbQueueAdmin {
 | 
			
		||||
 | 
			
		||||
    private final Set<String> queues = ConcurrentHashMap.newKeySet();
 | 
			
		||||
 | 
			
		||||
    private final ManagementClient client;
 | 
			
		||||
 | 
			
		||||
    public TbServiceBusAdmin(TbServiceBusSettings serviceBusSettings) {
 | 
			
		||||
        ConnectionStringBuilder builder = new ConnectionStringBuilder(
 | 
			
		||||
                serviceBusSettings.getNamespaceName(),
 | 
			
		||||
                "queues",
 | 
			
		||||
                serviceBusSettings.getSasKeyName(),
 | 
			
		||||
                serviceBusSettings.getSasKey());
 | 
			
		||||
 | 
			
		||||
        client = new ManagementClient(builder);
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            client.getQueues().forEach(queueDescription -> queues.add(queueDescription.getPath()));
 | 
			
		||||
        } catch (ServiceBusException | InterruptedException e) {
 | 
			
		||||
            log.error("Failed to get queues.", e);
 | 
			
		||||
            throw new RuntimeException("Failed to get queues.", e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void createTopicIfNotExists(String topic) {
 | 
			
		||||
        if (queues.contains(topic)) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            client.createQueue(topic);
 | 
			
		||||
            queues.add(topic);
 | 
			
		||||
        } catch (ServiceBusException | InterruptedException e) {
 | 
			
		||||
            log.error("Failed to create queue: [{}]", topic, e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
    private void destroy() {
 | 
			
		||||
        try {
 | 
			
		||||
            client.close();
 | 
			
		||||
        } catch (IOException e) {
 | 
			
		||||
            log.error("Failed to close ManagementClient.");
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,210 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.azure.servicebus;
 | 
			
		||||
 | 
			
		||||
import com.google.gson.Gson;
 | 
			
		||||
import com.google.protobuf.InvalidProtocolBufferException;
 | 
			
		||||
import com.microsoft.azure.servicebus.TransactionContext;
 | 
			
		||||
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
 | 
			
		||||
import com.microsoft.azure.servicebus.primitives.CoreMessageReceiver;
 | 
			
		||||
import com.microsoft.azure.servicebus.primitives.MessageWithDeliveryTag;
 | 
			
		||||
import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
 | 
			
		||||
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
 | 
			
		||||
import com.microsoft.azure.servicebus.primitives.SettleModePair;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.qpid.proton.amqp.messaging.Data;
 | 
			
		||||
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 | 
			
		||||
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 | 
			
		||||
import org.springframework.util.CollectionUtils;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsgDecoder;
 | 
			
		||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
 | 
			
		||||
 | 
			
		||||
import java.time.Duration;
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.CompletableFuture;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
import java.util.stream.Stream;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
 | 
			
		||||
    private final TbQueueAdmin admin;
 | 
			
		||||
    private final String topic;
 | 
			
		||||
    private final TbQueueMsgDecoder<T> decoder;
 | 
			
		||||
    private final TbServiceBusSettings serviceBusSettings;
 | 
			
		||||
 | 
			
		||||
    private final Gson gson = new Gson();
 | 
			
		||||
 | 
			
		||||
    private Set<CoreMessageReceiver> receivers;
 | 
			
		||||
    private volatile Set<TopicPartitionInfo> partitions;
 | 
			
		||||
    private volatile boolean subscribed;
 | 
			
		||||
    private volatile boolean stopped = false;
 | 
			
		||||
    private Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<>();
 | 
			
		||||
    private volatile int messagesPerQueue;
 | 
			
		||||
 | 
			
		||||
    public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder<T> decoder) {
 | 
			
		||||
        this.admin = admin;
 | 
			
		||||
        this.decoder = decoder;
 | 
			
		||||
        this.topic = topic;
 | 
			
		||||
        this.serviceBusSettings = serviceBusSettings;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String getTopic() {
 | 
			
		||||
        return topic;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void subscribe() {
 | 
			
		||||
        partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
 | 
			
		||||
        subscribed = false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void subscribe(Set<TopicPartitionInfo> partitions) {
 | 
			
		||||
        this.partitions = partitions;
 | 
			
		||||
        subscribed = false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void unsubscribe() {
 | 
			
		||||
        stopped = true;
 | 
			
		||||
        receivers.forEach(CoreMessageReceiver::closeAsync);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public List<T> poll(long durationInMillis) {
 | 
			
		||||
        if (!subscribed && partitions == null) {
 | 
			
		||||
            try {
 | 
			
		||||
                Thread.sleep(durationInMillis);
 | 
			
		||||
            } catch (InterruptedException e) {
 | 
			
		||||
                log.debug("Failed to await subscription", e);
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            if (!subscribed) {
 | 
			
		||||
                createReceivers();
 | 
			
		||||
                messagesPerQueue = receivers.size() / partitions.size();
 | 
			
		||||
                subscribed = true;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            List<CompletableFuture<Collection<MessageWithDeliveryTag>>> messageFutures =
 | 
			
		||||
                    receivers.stream()
 | 
			
		||||
                            .map(receiver -> receiver
 | 
			
		||||
                                    .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis))
 | 
			
		||||
                                    .whenComplete((messages, err) -> {
 | 
			
		||||
                                        if (!CollectionUtils.isEmpty(messages)) {
 | 
			
		||||
                                            pendingMessages.put(receiver, messages);
 | 
			
		||||
                                        } else if (err != null) {
 | 
			
		||||
                                            log.error("Failed to receive messages.", err);
 | 
			
		||||
                                        }
 | 
			
		||||
                                    }))
 | 
			
		||||
                            .collect(Collectors.toList());
 | 
			
		||||
            try {
 | 
			
		||||
                return fromList(messageFutures)
 | 
			
		||||
                        .get()
 | 
			
		||||
                        .stream()
 | 
			
		||||
                        .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream())
 | 
			
		||||
                        .map(message -> {
 | 
			
		||||
                            try {
 | 
			
		||||
                                return decode(message);
 | 
			
		||||
                            } catch (InvalidProtocolBufferException e) {
 | 
			
		||||
                                log.error("Failed to parse message.", e);
 | 
			
		||||
                                throw new RuntimeException("Failed to parse message.", e);
 | 
			
		||||
                            }
 | 
			
		||||
                        }).collect(Collectors.toList());
 | 
			
		||||
            } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
                if (stopped) {
 | 
			
		||||
                    log.info("[{}] Service Bus consumer is stopped.", topic);
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.error("Failed to receive messages", e);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return Collections.emptyList();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void createReceivers() {
 | 
			
		||||
        List<CompletableFuture<CoreMessageReceiver>> receiverFutures = partitions.stream()
 | 
			
		||||
                .map(TopicPartitionInfo::getFullTopicName)
 | 
			
		||||
                .map(queue -> {
 | 
			
		||||
                    MessagingFactory factory;
 | 
			
		||||
                    try {
 | 
			
		||||
                        factory = MessagingFactory.createFromConnectionStringBuilder(createConnection(queue));
 | 
			
		||||
                    } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
                        log.error("Failed to create factory for the queue [{}]", queue);
 | 
			
		||||
                        throw new RuntimeException("Failed to create the factory", e);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    return CoreMessageReceiver.create(factory, queue, queue, 0,
 | 
			
		||||
                            new SettleModePair(SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND),
 | 
			
		||||
                            MessagingEntityType.QUEUE);
 | 
			
		||||
                }).collect(Collectors.toList());
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            receivers = new HashSet<>(fromList(receiverFutures).get());
 | 
			
		||||
        } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
            if (stopped) {
 | 
			
		||||
                log.info("[{}] Service Bus consumer is stopped.", topic);
 | 
			
		||||
            } else {
 | 
			
		||||
                log.error("Failed to create receivers", e);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ConnectionStringBuilder createConnection(String queue) {
 | 
			
		||||
        admin.createTopicIfNotExists(queue);
 | 
			
		||||
        return new ConnectionStringBuilder(
 | 
			
		||||
                serviceBusSettings.getNamespaceName(),
 | 
			
		||||
                queue,
 | 
			
		||||
                serviceBusSettings.getSasKeyName(),
 | 
			
		||||
                serviceBusSettings.getSasKey());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private <V> CompletableFuture<List<V>> fromList(List<CompletableFuture<V>> futures) {
 | 
			
		||||
        CompletableFuture<Collection<V>>[] arrayFuture = new CompletableFuture[futures.size()];
 | 
			
		||||
        futures.toArray(arrayFuture);
 | 
			
		||||
 | 
			
		||||
        return CompletableFuture
 | 
			
		||||
                .allOf(arrayFuture)
 | 
			
		||||
                .thenApply(v -> futures
 | 
			
		||||
                        .stream()
 | 
			
		||||
                        .map(CompletableFuture::join)
 | 
			
		||||
                        .collect(Collectors.toList()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void commit() {
 | 
			
		||||
        pendingMessages.forEach((receiver, msgs) ->
 | 
			
		||||
                msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN)));
 | 
			
		||||
        pendingMessages.clear();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException {
 | 
			
		||||
        DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class);
 | 
			
		||||
        return decoder.decode(msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,110 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.azure.servicebus;
 | 
			
		||||
 | 
			
		||||
import com.google.gson.Gson;
 | 
			
		||||
import com.microsoft.azure.servicebus.IMessage;
 | 
			
		||||
import com.microsoft.azure.servicebus.Message;
 | 
			
		||||
import com.microsoft.azure.servicebus.QueueClient;
 | 
			
		||||
import com.microsoft.azure.servicebus.ReceiveMode;
 | 
			
		||||
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
 | 
			
		||||
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueCallback;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
 | 
			
		||||
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.concurrent.CompletableFuture;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class TbServiceBusProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
 | 
			
		||||
    private final String defaultTopic;
 | 
			
		||||
    private final Gson gson = new Gson();
 | 
			
		||||
    private final TbQueueAdmin admin;
 | 
			
		||||
    private final TbServiceBusSettings serviceBusSettings;
 | 
			
		||||
    private final Map<String, QueueClient> clients = new HashMap<>();
 | 
			
		||||
    private ExecutorService executorService;
 | 
			
		||||
 | 
			
		||||
    public TbServiceBusProducerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String defaultTopic) {
 | 
			
		||||
        this.admin = admin;
 | 
			
		||||
        this.defaultTopic = defaultTopic;
 | 
			
		||||
        this.serviceBusSettings = serviceBusSettings;
 | 
			
		||||
        executorService = Executors.newSingleThreadExecutor();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init() {
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String getDefaultTopic() {
 | 
			
		||||
        return defaultTopic;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
 | 
			
		||||
        IMessage message = new Message(gson.toJson(new DefaultTbQueueMsg(msg)));
 | 
			
		||||
        CompletableFuture<Void> future = getClient(tpi.getFullTopicName()).sendAsync(message);
 | 
			
		||||
        future.whenCompleteAsync((success, err) -> {
 | 
			
		||||
            if (err != null) {
 | 
			
		||||
                callback.onFailure(err);
 | 
			
		||||
            } else {
 | 
			
		||||
                callback.onSuccess(null);
 | 
			
		||||
            }
 | 
			
		||||
        }, executorService);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void stop() {
 | 
			
		||||
        clients.forEach((t, client) -> {
 | 
			
		||||
            try {
 | 
			
		||||
                client.close();
 | 
			
		||||
            } catch (ServiceBusException e) {
 | 
			
		||||
                log.error("Failed to close QueueClient.", e);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        if (executorService != null) {
 | 
			
		||||
            executorService.shutdownNow();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private QueueClient getClient(String topic) {
 | 
			
		||||
        return clients.computeIfAbsent(topic, k -> {
 | 
			
		||||
            admin.createTopicIfNotExists(topic);
 | 
			
		||||
            ConnectionStringBuilder builder =
 | 
			
		||||
                    new ConnectionStringBuilder(
 | 
			
		||||
                            serviceBusSettings.getNamespaceName(),
 | 
			
		||||
                            topic,
 | 
			
		||||
                            serviceBusSettings.getSasKeyName(),
 | 
			
		||||
                            serviceBusSettings.getSasKey());
 | 
			
		||||
            try {
 | 
			
		||||
                return new QueueClient(builder, ReceiveMode.PEEKLOCK);
 | 
			
		||||
            } catch (InterruptedException | ServiceBusException e) {
 | 
			
		||||
                log.error("Failed to create new client for the Queue: [{}]", topic, e);
 | 
			
		||||
                throw new RuntimeException("Failed to create new client for the Queue", e);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,37 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.azure.servicebus;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@ConditionalOnExpression("'${queue.type:null}'=='service-bus'")
 | 
			
		||||
@Component
 | 
			
		||||
@Data
 | 
			
		||||
public class TbServiceBusSettings {
 | 
			
		||||
    @Value("${queue.service_bus.namespace_name}")
 | 
			
		||||
    private String namespaceName;
 | 
			
		||||
    @Value("${queue.service_bus.sas_key_name}")
 | 
			
		||||
    private String sasKeyName;
 | 
			
		||||
    @Value("${queue.service_bus.sas_key}")
 | 
			
		||||
    private String sasKey;
 | 
			
		||||
    @Value("${queue.service_bus.max_messages}")
 | 
			
		||||
    private int maxMessages;
 | 
			
		||||
}
 | 
			
		||||
@ -15,7 +15,6 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.common;
 | 
			
		||||
 | 
			
		||||
import com.google.gson.annotations.Expose;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsgHeaders;
 | 
			
		||||
@ -26,12 +25,20 @@ import java.util.UUID;
 | 
			
		||||
public class DefaultTbQueueMsg implements TbQueueMsg {
 | 
			
		||||
    private final UUID key;
 | 
			
		||||
    private final byte[] data;
 | 
			
		||||
    private DefaultTbQueueMsgHeaders headers;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    public DefaultTbQueueMsg(UUID key, byte[] data) {
 | 
			
		||||
        this.key = key;
 | 
			
		||||
        this.data = data;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Expose(serialize = false, deserialize = false)
 | 
			
		||||
    private TbQueueMsgHeaders headers;
 | 
			
		||||
    public DefaultTbQueueMsg(TbQueueMsg msg) {
 | 
			
		||||
        this.key = msg.getKey();
 | 
			
		||||
        this.data = msg.getData();
 | 
			
		||||
        DefaultTbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
 | 
			
		||||
        msg.getHeaders().getData().forEach(headers::put);
 | 
			
		||||
        this.headers = headers;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -36,7 +36,6 @@ import java.nio.charset.StandardCharsets;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.LinkedHashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
 | 
			
		||||
@ -21,14 +21,14 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
 | 
			
		||||
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
 | 
			
		||||
 | 
			
		||||
@ -18,21 +18,22 @@ package org.thingsboard.server.queue.provider;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
 | 
			
		||||
@ -48,8 +49,6 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
			
		||||
    private final TbQueueTransportApiSettings transportApiSettings;
 | 
			
		||||
    private final PartitionService partitionService;
 | 
			
		||||
    private final TbServiceInfoProvider serviceInfoProvider;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    private final TbQueueAdmin admin;
 | 
			
		||||
 | 
			
		||||
    public AwsSqsTbCoreQueueFactory(TbAwsSqsSettings sqsSettings,
 | 
			
		||||
@ -78,7 +77,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
 | 
			
		||||
        return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -88,7 +87,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
 | 
			
		||||
        return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -99,15 +98,16 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
 | 
			
		||||
        return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings,
 | 
			
		||||
                partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() {
 | 
			
		||||
        return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
        return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -24,12 +24,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
 | 
			
		||||
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
 | 
			
		||||
@ -86,7 +86,8 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
 | 
			
		||||
        return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
        return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -18,16 +18,20 @@ package org.thingsboard.server.queue.provider;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueRequestTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
 | 
			
		||||
@ -55,17 +59,17 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueRequestTemplate<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiRequestTemplate() {
 | 
			
		||||
        TbAwsSqsProducerTemplate<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> producerTemplate =
 | 
			
		||||
    public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
 | 
			
		||||
        TbAwsSqsProducerTemplate<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
 | 
			
		||||
                new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic());
 | 
			
		||||
 | 
			
		||||
        TbAwsSqsConsumerTemplate<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> consumerTemplate =
 | 
			
		||||
        TbAwsSqsConsumerTemplate<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
 | 
			
		||||
                new TbAwsSqsConsumerTemplate<>(admin, sqsSettings,
 | 
			
		||||
                        transportApiSettings.getResponsesTopic() + "_" + serviceInfoProvider.getServiceId(),
 | 
			
		||||
                        msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
                        msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
 | 
			
		||||
        DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
 | 
			
		||||
                <TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
 | 
			
		||||
                <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
 | 
			
		||||
        templateBuilder.queueAdmin(admin);
 | 
			
		||||
        templateBuilder.requestTemplate(producerTemplate);
 | 
			
		||||
        templateBuilder.responseTemplate(consumerTemplate);
 | 
			
		||||
@ -76,18 +80,18 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() {
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
 | 
			
		||||
        return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() {
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
 | 
			
		||||
        return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsConsumer() {
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsConsumer() {
 | 
			
		||||
        return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportNotificationSettings.getNotificationsTopic() + "_" + serviceInfoProvider.getServiceId(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -26,14 +26,14 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
 | 
			
		||||
@ -19,22 +19,22 @@ import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueRequestTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueRequestTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ConditionalOnExpression("'${queue.type:null}'=='in-memory' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')")
 | 
			
		||||
 | 
			
		||||
@ -26,17 +26,17 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
 | 
			
		||||
@ -26,16 +26,16 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && '${service.type:null}'=='tb-core'")
 | 
			
		||||
 | 
			
		||||
@ -24,15 +24,15 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
 | 
			
		||||
@ -27,11 +27,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
@ -39,6 +35,10 @@ import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ -54,8 +54,6 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
 | 
			
		||||
    private final PartitionService partitionService;
 | 
			
		||||
    private final TbServiceInfoProvider serviceInfoProvider;
 | 
			
		||||
 | 
			
		||||
    private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> tbCoreProducer;
 | 
			
		||||
 | 
			
		||||
    public PubSubMonolithQueueFactory(TbPubSubSettings pubSubSettings,
 | 
			
		||||
                                      TbQueueCoreSettings coreSettings,
 | 
			
		||||
                                      TbQueueRuleEngineSettings ruleEngineSettings,
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,134 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.provider;
 | 
			
		||||
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'")
 | 
			
		||||
public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory {
 | 
			
		||||
 | 
			
		||||
    private final PartitionService partitionService;
 | 
			
		||||
    private final TbQueueCoreSettings coreSettings;
 | 
			
		||||
    private final TbServiceInfoProvider serviceInfoProvider;
 | 
			
		||||
    private final TbQueueRuleEngineSettings ruleEngineSettings;
 | 
			
		||||
    private final TbQueueTransportApiSettings transportApiSettings;
 | 
			
		||||
    private final TbQueueTransportNotificationSettings transportNotificationSettings;
 | 
			
		||||
    private final TbServiceBusSettings serviceBusSettings;
 | 
			
		||||
    private final TbQueueAdmin admin;
 | 
			
		||||
 | 
			
		||||
    public ServiceBusMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
 | 
			
		||||
                                          TbQueueRuleEngineSettings ruleEngineSettings,
 | 
			
		||||
                                          TbServiceInfoProvider serviceInfoProvider,
 | 
			
		||||
                                          TbQueueTransportApiSettings transportApiSettings,
 | 
			
		||||
                                          TbQueueTransportNotificationSettings transportNotificationSettings,
 | 
			
		||||
                                          TbServiceBusSettings serviceBusSettings,
 | 
			
		||||
                                          TbQueueAdmin admin) {
 | 
			
		||||
        this.partitionService = partitionService;
 | 
			
		||||
        this.coreSettings = coreSettings;
 | 
			
		||||
        this.serviceInfoProvider = serviceInfoProvider;
 | 
			
		||||
        this.ruleEngineSettings = ruleEngineSettings;
 | 
			
		||||
        this.transportApiSettings = transportApiSettings;
 | 
			
		||||
        this.transportNotificationSettings = transportNotificationSettings;
 | 
			
		||||
        this.serviceBusSettings = serviceBusSettings;
 | 
			
		||||
        this.admin = admin;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
 | 
			
		||||
        return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
 | 
			
		||||
        return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings,
 | 
			
		||||
                partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() {
 | 
			
		||||
        return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
 | 
			
		||||
        return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings,
 | 
			
		||||
                partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() {
 | 
			
		||||
        return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getResponsesTopic());
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,116 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.provider;
 | 
			
		||||
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'")
 | 
			
		||||
public class ServiceBusTbCoreQueueProvider implements TbCoreQueueFactory {
 | 
			
		||||
 | 
			
		||||
    private final TbServiceBusSettings serviceBusSettings;
 | 
			
		||||
    private final TbQueueRuleEngineSettings ruleEngineSettings;
 | 
			
		||||
    private final TbQueueCoreSettings coreSettings;
 | 
			
		||||
    private final TbQueueTransportApiSettings transportApiSettings;
 | 
			
		||||
    private final PartitionService partitionService;
 | 
			
		||||
    private final TbServiceInfoProvider serviceInfoProvider;
 | 
			
		||||
    private final TbQueueAdmin admin;
 | 
			
		||||
 | 
			
		||||
    public ServiceBusTbCoreQueueProvider(TbServiceBusSettings serviceBusSettings,
 | 
			
		||||
                                         TbQueueCoreSettings coreSettings,
 | 
			
		||||
                                         TbQueueTransportApiSettings transportApiSettings,
 | 
			
		||||
                                         TbQueueRuleEngineSettings ruleEngineSettings,
 | 
			
		||||
                                         PartitionService partitionService,
 | 
			
		||||
                                         TbServiceInfoProvider serviceInfoProvider,
 | 
			
		||||
                                         TbQueueAdmin admin) {
 | 
			
		||||
        this.serviceBusSettings = serviceBusSettings;
 | 
			
		||||
        this.coreSettings = coreSettings;
 | 
			
		||||
        this.transportApiSettings = transportApiSettings;
 | 
			
		||||
        this.ruleEngineSettings = ruleEngineSettings;
 | 
			
		||||
        this.partitionService = partitionService;
 | 
			
		||||
        this.serviceInfoProvider = serviceInfoProvider;
 | 
			
		||||
        this.admin = admin;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() {
 | 
			
		||||
        return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
 | 
			
		||||
        return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings,
 | 
			
		||||
                partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() {
 | 
			
		||||
        return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,99 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.provider;
 | 
			
		||||
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-rule-engine'")
 | 
			
		||||
public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
 | 
			
		||||
 | 
			
		||||
    private final PartitionService partitionService;
 | 
			
		||||
    private final TbQueueCoreSettings coreSettings;
 | 
			
		||||
    private final TbServiceInfoProvider serviceInfoProvider;
 | 
			
		||||
    private final TbQueueRuleEngineSettings ruleEngineSettings;
 | 
			
		||||
    private final TbServiceBusSettings serviceBusSettings;
 | 
			
		||||
    private final TbQueueAdmin admin;
 | 
			
		||||
 | 
			
		||||
    public ServiceBusTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
 | 
			
		||||
                                              TbQueueRuleEngineSettings ruleEngineSettings,
 | 
			
		||||
                                              TbServiceInfoProvider serviceInfoProvider,
 | 
			
		||||
                                              TbServiceBusSettings serviceBusSettings,
 | 
			
		||||
                                              TbQueueAdmin admin) {
 | 
			
		||||
        this.partitionService = partitionService;
 | 
			
		||||
        this.coreSettings = coreSettings;
 | 
			
		||||
        this.serviceInfoProvider = serviceInfoProvider;
 | 
			
		||||
        this.ruleEngineSettings = ruleEngineSettings;
 | 
			
		||||
        this.serviceBusSettings = serviceBusSettings;
 | 
			
		||||
        this.admin = admin;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
 | 
			
		||||
        return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
 | 
			
		||||
        return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings,
 | 
			
		||||
                partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,94 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.provider;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueRequestTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')")
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory {
 | 
			
		||||
    private final TbQueueTransportApiSettings transportApiSettings;
 | 
			
		||||
    private final TbQueueTransportNotificationSettings transportNotificationSettings;
 | 
			
		||||
    private final TbServiceBusSettings serviceBusSettings;
 | 
			
		||||
    private final TbQueueAdmin admin;
 | 
			
		||||
    private final TbServiceInfoProvider serviceInfoProvider;
 | 
			
		||||
 | 
			
		||||
    public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings,
 | 
			
		||||
                                            TbQueueTransportNotificationSettings transportNotificationSettings,
 | 
			
		||||
                                            TbServiceBusSettings serviceBusSettings,
 | 
			
		||||
                                            TbServiceInfoProvider serviceInfoProvider,
 | 
			
		||||
                                            TbQueueAdmin admin) {
 | 
			
		||||
        this.transportApiSettings = transportApiSettings;
 | 
			
		||||
        this.transportNotificationSettings = transportNotificationSettings;
 | 
			
		||||
        this.serviceBusSettings = serviceBusSettings;
 | 
			
		||||
        this.admin = admin;
 | 
			
		||||
        this.serviceInfoProvider = serviceInfoProvider;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueRequestTemplate<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiRequestTemplate() {
 | 
			
		||||
        TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> producerTemplate =
 | 
			
		||||
                new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic());
 | 
			
		||||
 | 
			
		||||
        TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> consumerTemplate =
 | 
			
		||||
                new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings,
 | 
			
		||||
                        transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(),
 | 
			
		||||
                        msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
 | 
			
		||||
        DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
 | 
			
		||||
                <TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
 | 
			
		||||
        templateBuilder.queueAdmin(admin);
 | 
			
		||||
        templateBuilder.requestTemplate(producerTemplate);
 | 
			
		||||
        templateBuilder.responseTemplate(consumerTemplate);
 | 
			
		||||
        templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
 | 
			
		||||
        templateBuilder.maxRequestTimeout(transportApiSettings.getMaxRequestsTimeout());
 | 
			
		||||
        templateBuilder.pollInterval(transportApiSettings.getResponsePollInterval());
 | 
			
		||||
        return templateBuilder.build();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() {
 | 
			
		||||
        return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsConsumer() {
 | 
			
		||||
        return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings,
 | 
			
		||||
                transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(),
 | 
			
		||||
                msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -20,7 +20,6 @@ import com.google.api.core.ApiFutures;
 | 
			
		||||
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
 | 
			
		||||
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
 | 
			
		||||
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
 | 
			
		||||
import com.google.common.reflect.TypeToken;
 | 
			
		||||
import com.google.gson.Gson;
 | 
			
		||||
import com.google.protobuf.InvalidProtocolBufferException;
 | 
			
		||||
import com.google.pubsub.v1.AcknowledgeRequest;
 | 
			
		||||
@ -36,15 +35,12 @@ import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsgDecoder;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsgHeaders;
 | 
			
		||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.CopyOnWriteArrayList;
 | 
			
		||||
@ -139,7 +135,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
 | 
			
		||||
                subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet());
 | 
			
		||||
                subscriptionNames.forEach(admin::createTopicIfNotExists);
 | 
			
		||||
                consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size());
 | 
			
		||||
                messagesPerTopic = pubSubSettings.getMaxMessages()/subscriptionNames.size();
 | 
			
		||||
                messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size();
 | 
			
		||||
                subscribed = true;
 | 
			
		||||
            }
 | 
			
		||||
            List<ReceivedMessage> messages;
 | 
			
		||||
@ -217,11 +213,6 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
 | 
			
		||||
 | 
			
		||||
    public T decode(PubsubMessage message) throws InvalidProtocolBufferException {
 | 
			
		||||
        DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class);
 | 
			
		||||
        TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
 | 
			
		||||
        Map<String, byte[]> headerMap = gson.fromJson(message.getAttributesMap().get("headers"), new TypeToken<Map<String, byte[]>>() {
 | 
			
		||||
        }.getType());
 | 
			
		||||
        headerMap.forEach(headers::put);
 | 
			
		||||
        msg.setHeaders(headers);
 | 
			
		||||
        return decoder.decode(msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -71,7 +71,6 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
 | 
			
		||||
    public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
 | 
			
		||||
        PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder();
 | 
			
		||||
        pubsubMessageBuilder.setData(getMsg(msg));
 | 
			
		||||
        pubsubMessageBuilder.putAttributes("headers", gson.toJson(msg.getHeaders().getData()));
 | 
			
		||||
 | 
			
		||||
        Publisher publisher = getOrCreatePublisher(tpi.getFullTopicName());
 | 
			
		||||
        ApiFuture<String> future = publisher.publish(pubsubMessageBuilder.build());
 | 
			
		||||
@ -110,7 +109,7 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ByteString getMsg(T msg) {
 | 
			
		||||
        String json = gson.toJson(new DefaultTbQueueMsg(msg.getKey(), msg.getData()));
 | 
			
		||||
        String json = gson.toJson(new DefaultTbQueueMsg(msg));
 | 
			
		||||
        return ByteString.copyFrom(json.getBytes());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -23,7 +23,6 @@ import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
 | 
			
		||||
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
 | 
			
		||||
import com.amazonaws.services.sqs.model.Message;
 | 
			
		||||
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
 | 
			
		||||
import com.google.common.reflect.TypeToken;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListeningExecutorService;
 | 
			
		||||
@ -38,15 +37,12 @@ import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsgDecoder;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsgHeaders;
 | 
			
		||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.CopyOnWriteArrayList;
 | 
			
		||||
@ -161,7 +157,7 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
 | 
			
		||||
                if (stopped) {
 | 
			
		||||
                    log.info("[{}] Aws SQS consumer is stopped.", topic);
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.error("Failed to pool messages.",  e);
 | 
			
		||||
                    log.error("Failed to pool messages.", e);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
@ -214,11 +210,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
 | 
			
		||||
 | 
			
		||||
    public T decode(Message message) throws InvalidProtocolBufferException {
 | 
			
		||||
        DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class);
 | 
			
		||||
        TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
 | 
			
		||||
        Map<String, byte[]> headerMap = gson.fromJson(message.getMessageAttributes().get("headers").getStringValue(), new TypeToken<Map<String, byte[]>>() {
 | 
			
		||||
        }.getType());
 | 
			
		||||
        headerMap.forEach(headers::put);
 | 
			
		||||
        msg.setHeaders(headers);
 | 
			
		||||
        return decoder.decode(msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -20,7 +20,6 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider;
 | 
			
		||||
import com.amazonaws.auth.BasicAWSCredentials;
 | 
			
		||||
import com.amazonaws.services.sqs.AmazonSQS;
 | 
			
		||||
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
 | 
			
		||||
import com.amazonaws.services.sqs.model.MessageAttributeValue;
 | 
			
		||||
import com.amazonaws.services.sqs.model.SendMessageRequest;
 | 
			
		||||
import com.amazonaws.services.sqs.model.SendMessageResult;
 | 
			
		||||
import com.google.common.util.concurrent.FutureCallback;
 | 
			
		||||
@ -37,7 +36,6 @@ import org.thingsboard.server.queue.TbQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
 | 
			
		||||
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
@ -82,13 +80,6 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
 | 
			
		||||
        sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName()));
 | 
			
		||||
        sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg.getKey(), msg.getData())));
 | 
			
		||||
 | 
			
		||||
        Map<String, MessageAttributeValue> attributes = new HashMap<>();
 | 
			
		||||
 | 
			
		||||
        attributes.put("headers", new MessageAttributeValue()
 | 
			
		||||
                .withStringValue(gson.toJson(msg.getHeaders().getData()))
 | 
			
		||||
                .withDataType("String"));
 | 
			
		||||
 | 
			
		||||
        sendMsgRequest.withMessageAttributes(attributes);
 | 
			
		||||
        sendMsgRequest.withMessageGroupId(msg.getKey().toString());
 | 
			
		||||
        ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest));
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										6
									
								
								pom.xml
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								pom.xml
									
									
									
									
									
								
							@ -94,6 +94,7 @@
 | 
			
		||||
        <snakeyaml.version>1.23</snakeyaml.version>
 | 
			
		||||
        <amazonaws.sqs.version>1.11.747</amazonaws.sqs.version>
 | 
			
		||||
        <pubsub.client.version>1.84.0</pubsub.client.version>
 | 
			
		||||
        <azure-servicebus.version>3.2.0</azure-servicebus.version>
 | 
			
		||||
        <passay.version>1.5.0</passay.version>
 | 
			
		||||
        <ua-parser.version>1.4.3</ua-parser.version>
 | 
			
		||||
    </properties>
 | 
			
		||||
@ -898,6 +899,11 @@
 | 
			
		||||
                <artifactId>google-cloud-pubsub</artifactId>
 | 
			
		||||
                <version>${pubsub.client.version}</version>
 | 
			
		||||
            </dependency>
 | 
			
		||||
            <dependency>
 | 
			
		||||
                <groupId>com.microsoft.azure</groupId>
 | 
			
		||||
                <artifactId>azure-servicebus</artifactId>
 | 
			
		||||
                <version>${azure-servicebus.version}</version>
 | 
			
		||||
            </dependency>
 | 
			
		||||
            <dependency>
 | 
			
		||||
                <groupId>org.passay</groupId>
 | 
			
		||||
                <artifactId>passay</artifactId>
 | 
			
		||||
 | 
			
		||||
@ -63,7 +63,7 @@ transport:
 | 
			
		||||
    max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}"
 | 
			
		||||
 | 
			
		||||
queue:
 | 
			
		||||
  type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs or pubsub
 | 
			
		||||
  type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs or pubsub or service-bus
 | 
			
		||||
  kafka:
 | 
			
		||||
    bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
 | 
			
		||||
    acks: "${TB_KAFKA_ACKS:all}"
 | 
			
		||||
@ -83,6 +83,11 @@ queue:
 | 
			
		||||
    ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again
 | 
			
		||||
    max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes
 | 
			
		||||
    max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
 | 
			
		||||
  service_bus:
 | 
			
		||||
    namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}"
 | 
			
		||||
    sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
 | 
			
		||||
    sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}"
 | 
			
		||||
    max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}"
 | 
			
		||||
  partitions:
 | 
			
		||||
    hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
 | 
			
		||||
    virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user