diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java index 37c99dfccb..1e5873ab67 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java @@ -21,5 +21,5 @@ public interface TbQueueAdmin { void destroy(); - default void deleteTopic(String topic) { } + void deleteTopic(String topic); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java index baf1ea0334..ae6a5146e0 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java @@ -41,7 +41,7 @@ public class TopicPartitionInfo { this.partition = partition; this.myPartition = myPartition; String tmp = topic; - if (tenantId != null) { + if (tenantId != null && !tenantId.isNullUid()) { tmp += "." + tenantId.getId().toString(); } if (partition != null) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java new file mode 100644 index 0000000000..1dd6c0045d --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java @@ -0,0 +1,114 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.TbKafkaSettings; +import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; +import org.thingsboard.server.queue.pubsub.TbPubSubSettings; +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqAdmin; +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; +import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; +import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; +import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; + +@Configuration +public class RuleEngineTbQueueAdminFactory { + + @Autowired(required = false) + private TbKafkaTopicConfigs kafkaTopicConfigs; + @Autowired(required = false) + private TbKafkaSettings kafkaSettings; + + @Autowired(required = false) + private TbAwsSqsQueueAttributes awsSqsQueueAttributes; + @Autowired(required = false) + private TbAwsSqsSettings awsSqsSettings; + + @Autowired(required = false) + private TbPubSubSubscriptionSettings pubSubSubscriptionSettings; + @Autowired(required = false) + private TbPubSubSettings pubSubSettings; + + @Autowired(required = false) + private TbRabbitMqQueueArguments rabbitMqQueueArguments; + @Autowired(required = false) + private TbRabbitMqSettings rabbitMqSettings; + + @Autowired(required = false) + private TbServiceBusQueueConfigs serviceBusQueueConfigs; + @Autowired(required = false) + private TbServiceBusSettings serviceBusSettings; + + @ConditionalOnExpression("'${queue.type:null}'=='kafka'") + @Bean + public TbQueueAdmin createKafkaAdmin() { + return new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); + } + + @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs'") + @Bean + public TbQueueAdmin createAwsSqsAdmin() { + return new TbAwsSqsAdmin(awsSqsSettings, awsSqsQueueAttributes.getRuleEngineAttributes()); + } + + @ConditionalOnExpression("'${queue.type:null}'=='pubsub'") + @Bean + public TbQueueAdmin createPubSubAdmin() { + return new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings()); + } + + @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'") + @Bean + public TbQueueAdmin createRabbitMqAdmin() { + return new TbRabbitMqAdmin(rabbitMqSettings, rabbitMqQueueArguments.getRuleEngineArgs()); + } + + @ConditionalOnExpression("'${queue.type:null}'=='service-bus'") + @Bean + public TbQueueAdmin createServiceBusAdmin() { + return new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); + } + + @ConditionalOnExpression("'${queue.type:null}'=='in-memory'") + @Bean + public TbQueueAdmin createInMemoryAdmin() { + return new TbQueueAdmin() { + + @Override + public void createTopicIfNotExists(String topic) { + } + + @Override + public void deleteTopic(String topic) { + } + + @Override + public void destroy() { + } + }; + } +} \ No newline at end of file diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java index 71ec4b6811..c8f76a8eeb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java @@ -82,6 +82,31 @@ public class TbServiceBusAdmin implements TbQueueAdmin { } } + @Override + public void deleteTopic(String topic) { + if (queues.contains(topic)) { + doDelete(topic); + } else { + try { + if (client.getQueue(topic) != null) { + doDelete(topic); + } else { + log.warn("Azure Service Bus Queue [{}] is not exist.", topic); + } + } catch (ServiceBusException | InterruptedException e) { + log.error("Failed to delete Azure Service Bus queue [{}]", topic, e); + } + } + } + + private void doDelete(String topic) { + try { + client.deleteTopic(topic); + } catch (ServiceBusException | InterruptedException e) { + log.error("Failed to delete Azure Service Bus queue [{}]", topic, e); + } + } + private void setQueueConfigs(QueueDescription queueDescription) { queueConfigs.forEach((confKey, confValue) -> { switch (confKey) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index 1034263690..d5a9a34dc7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -84,6 +84,23 @@ public class TbKafkaAdmin implements TbQueueAdmin { } + @Override + public void deleteTopic(String topic) { + if (topics.contains(topic)) { + client.deleteTopics(Collections.singletonList(topic)); + } else { + try { + if (client.listTopics().names().get().contains(topic)) { + client.deleteTopics(Collections.singletonList(topic)); + } else { + log.warn("Kafka topic [{}] does not exist.", topic); + } + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to delete kafka topic [{}].", topic, e); + } + } + } + @Override public void destroy() { if (client != null) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java index cb60272ace..252b70459e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java @@ -73,6 +73,9 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory @Override public void destroy() {} + + @Override + public void deleteTopic(String topic) {} }); templateBuilder.requestTemplate(producerTemplate); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java index 7fb7cc9463..06d082feb1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java @@ -136,6 +136,37 @@ public class TbPubSubAdmin implements TbQueueAdmin { createSubscriptionIfNotExists(partition, topicName); } + @Override + public void deleteTopic(String topic) { + TopicName topicName = TopicName.newBuilder() + .setTopic(topic) + .setProject(pubSubSettings.getProjectId()) + .build(); + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(pubSubSettings.getProjectId(), topic); + + if (topicSet.contains(topicName.toString())) { + topicAdminClient.deleteTopic(topicName); + } else { + if (topicAdminClient.getTopic(topicName) != null) { + topicAdminClient.deleteTopic(topicName); + } else { + log.warn("PubSub topic [{}] does not exist.", topic); + } + } + + if (subscriptionSet.contains(subscriptionName.toString())) { + subscriptionAdminClient.deleteSubscription(subscriptionName); + } else { + if (subscriptionAdminClient.getSubscription(subscriptionName) != null) { + subscriptionAdminClient.deleteSubscription(subscriptionName); + } else { + log.warn("PubSub subscription [{}] does not exist.", topic); + } + } + } + private void createSubscriptionIfNotExists(String partition, TopicName topicName) { ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(pubSubSettings.getProjectId(), partition); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java index d2e5172a6f..3c7c18ad93 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java @@ -58,6 +58,15 @@ public class TbRabbitMqAdmin implements TbQueueAdmin { } } + @Override + public void deleteTopic(String topic) { + try { + channel.queueDelete(topic); + } catch (IOException e) { + log.error("Failed to delete RabbitMq queue [{}].", topic); + } + } + @Override public void destroy() { if (channel != null) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java index 39ffc65e4a..3bd141f4db 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java @@ -23,18 +23,20 @@ import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; +import com.amazonaws.services.sqs.model.GetQueueUrlResult; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.queue.TbQueueAdmin; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import java.util.stream.Collectors; +@Slf4j public class TbAwsSqsAdmin implements TbQueueAdmin { private final Map attributes; private final AmazonSQS sqsClient; - private final Set queues; + private final Map queues; public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map attributes) { this.attributes = attributes; @@ -57,18 +59,37 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { .getQueueUrls() .stream() .map(this::getQueueNameFromUrl) - .collect(Collectors.toCollection(ConcurrentHashMap::newKeySet)); + .collect(Collectors.toMap(this::convertTopicToQueueName, Function.identity())); } @Override public void createTopicIfNotExists(String topic) { - String queueName = topic.replaceAll("\\.", "_") + ".fifo"; - if (queues.contains(queueName)) { + String queueName = convertTopicToQueueName(topic); + if (queues.containsKey(queueName)) { return; } final CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName).withAttributes(attributes); String queueUrl = sqsClient.createQueue(createQueueRequest).getQueueUrl(); - queues.add(getQueueNameFromUrl(queueUrl)); + queues.put(getQueueNameFromUrl(queueUrl), queueUrl); + } + + private String convertTopicToQueueName(String topic) { + return topic.replaceAll("\\.", "_") + ".fifo"; + } + + @Override + public void deleteTopic(String topic) { + String queueName = convertTopicToQueueName(topic); + if (queues.containsKey(queueName)) { + sqsClient.deleteQueue(queues.get(queueName)); + } else { + GetQueueUrlResult queueUrl = sqsClient.getQueueUrl(queueName); + if (queueUrl != null) { + sqsClient.deleteQueue(queueUrl.getQueueUrl()); + } else { + log.warn("Aws SQS queue [{}] does not exist!", queueName); + } + } } private String getQueueNameFromUrl(String queueUrl) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/BaseQueueService.java b/dao/src/main/java/org/thingsboard/server/dao/queue/BaseQueueService.java index cad7f9cb48..75978ea371 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/queue/BaseQueueService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/queue/BaseQueueService.java @@ -38,7 +38,6 @@ import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.PaginatedRemover; import org.thingsboard.server.dao.service.Validator; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; -import org.thingsboard.server.dao.tenant.TenantDao; import org.thingsboard.server.queue.TbQueueAdmin; import java.util.List; @@ -51,9 +50,6 @@ public class BaseQueueService extends AbstractEntityService implements QueueServ @Autowired private QueueDao queueDao; - @Autowired - private TenantDao tenantDao; - @Autowired private TbTenantProfileCache tenantProfileCache; @@ -101,21 +97,21 @@ public class BaseQueueService extends AbstractEntityService implements QueueServ int oldPartitions = oldQueue.getPartitions(); int currentPartitions = queue.getPartitions(); - //TODO: 3.2 remove if partitions can't be deleted. -// if (currentPartitions != oldPartitions && tbQueueAdmin != null) { + //TODO: remove if partitions can't be deleted. + if (currentPartitions != oldPartitions && tbQueueAdmin != null) { // queueClusterService.onQueueDelete(queue, null); -// if (currentPartitions > oldPartitions) { -// log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName()); -// for (int i = oldPartitions; i < currentPartitions; i++) { -// tbQueueAdmin.createTopicIfNotExists(new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName()); -// } -// } else { -// log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName()); -// for (int i = currentPartitions; i < oldPartitions; i++) { -// tbQueueAdmin.deleteTopic(new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName()); -// } -// } -// } + if (currentPartitions > oldPartitions) { + log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName()); + for (int i = oldPartitions; i < currentPartitions; i++) { + tbQueueAdmin.createTopicIfNotExists(new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName()); + } + } else { + log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName()); + for (int i = currentPartitions; i < oldPartitions; i++) { + tbQueueAdmin.deleteTopic(new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName()); + } + } + } return updatedQueue; }