implemented deleteTopic for queues

This commit is contained in:
YevhenBondarenko 2022-03-28 22:38:31 +03:00
parent fd53d3aeb4
commit 3764ebfa1c
10 changed files with 243 additions and 27 deletions

View File

@ -21,5 +21,5 @@ public interface TbQueueAdmin {
void destroy(); void destroy();
default void deleteTopic(String topic) { } void deleteTopic(String topic);
} }

View File

@ -41,7 +41,7 @@ public class TopicPartitionInfo {
this.partition = partition; this.partition = partition;
this.myPartition = myPartition; this.myPartition = myPartition;
String tmp = topic; String tmp = topic;
if (tenantId != null) { if (tenantId != null && !tenantId.isNullUid()) {
tmp += "." + tenantId.getId().toString(); tmp += "." + tenantId.getId().toString();
} }
if (partition != null) { if (partition != null) {

View File

@ -0,0 +1,114 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
import org.thingsboard.server.queue.rabbitmq.TbRabbitMqAdmin;
import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments;
import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes;
import org.thingsboard.server.queue.sqs.TbAwsSqsSettings;
@Configuration
public class RuleEngineTbQueueAdminFactory {
@Autowired(required = false)
private TbKafkaTopicConfigs kafkaTopicConfigs;
@Autowired(required = false)
private TbKafkaSettings kafkaSettings;
@Autowired(required = false)
private TbAwsSqsQueueAttributes awsSqsQueueAttributes;
@Autowired(required = false)
private TbAwsSqsSettings awsSqsSettings;
@Autowired(required = false)
private TbPubSubSubscriptionSettings pubSubSubscriptionSettings;
@Autowired(required = false)
private TbPubSubSettings pubSubSettings;
@Autowired(required = false)
private TbRabbitMqQueueArguments rabbitMqQueueArguments;
@Autowired(required = false)
private TbRabbitMqSettings rabbitMqSettings;
@Autowired(required = false)
private TbServiceBusQueueConfigs serviceBusQueueConfigs;
@Autowired(required = false)
private TbServiceBusSettings serviceBusSettings;
@ConditionalOnExpression("'${queue.type:null}'=='kafka'")
@Bean
public TbQueueAdmin createKafkaAdmin() {
return new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
}
@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs'")
@Bean
public TbQueueAdmin createAwsSqsAdmin() {
return new TbAwsSqsAdmin(awsSqsSettings, awsSqsQueueAttributes.getRuleEngineAttributes());
}
@ConditionalOnExpression("'${queue.type:null}'=='pubsub'")
@Bean
public TbQueueAdmin createPubSubAdmin() {
return new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings());
}
@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
@Bean
public TbQueueAdmin createRabbitMqAdmin() {
return new TbRabbitMqAdmin(rabbitMqSettings, rabbitMqQueueArguments.getRuleEngineArgs());
}
@ConditionalOnExpression("'${queue.type:null}'=='service-bus'")
@Bean
public TbQueueAdmin createServiceBusAdmin() {
return new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
}
@ConditionalOnExpression("'${queue.type:null}'=='in-memory'")
@Bean
public TbQueueAdmin createInMemoryAdmin() {
return new TbQueueAdmin() {
@Override
public void createTopicIfNotExists(String topic) {
}
@Override
public void deleteTopic(String topic) {
}
@Override
public void destroy() {
}
};
}
}

View File

@ -82,6 +82,31 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
} }
} }
@Override
public void deleteTopic(String topic) {
if (queues.contains(topic)) {
doDelete(topic);
} else {
try {
if (client.getQueue(topic) != null) {
doDelete(topic);
} else {
log.warn("Azure Service Bus Queue [{}] is not exist.", topic);
}
} catch (ServiceBusException | InterruptedException e) {
log.error("Failed to delete Azure Service Bus queue [{}]", topic, e);
}
}
}
private void doDelete(String topic) {
try {
client.deleteTopic(topic);
} catch (ServiceBusException | InterruptedException e) {
log.error("Failed to delete Azure Service Bus queue [{}]", topic, e);
}
}
private void setQueueConfigs(QueueDescription queueDescription) { private void setQueueConfigs(QueueDescription queueDescription) {
queueConfigs.forEach((confKey, confValue) -> { queueConfigs.forEach((confKey, confValue) -> {
switch (confKey) { switch (confKey) {

View File

@ -84,6 +84,23 @@ public class TbKafkaAdmin implements TbQueueAdmin {
} }
@Override
public void deleteTopic(String topic) {
if (topics.contains(topic)) {
client.deleteTopics(Collections.singletonList(topic));
} else {
try {
if (client.listTopics().names().get().contains(topic)) {
client.deleteTopics(Collections.singletonList(topic));
} else {
log.warn("Kafka topic [{}] does not exist.", topic);
}
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to delete kafka topic [{}].", topic, e);
}
}
}
@Override @Override
public void destroy() { public void destroy() {
if (client != null) { if (client != null) {

View File

@ -73,6 +73,9 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory
@Override @Override
public void destroy() {} public void destroy() {}
@Override
public void deleteTopic(String topic) {}
}); });
templateBuilder.requestTemplate(producerTemplate); templateBuilder.requestTemplate(producerTemplate);

View File

@ -136,6 +136,37 @@ public class TbPubSubAdmin implements TbQueueAdmin {
createSubscriptionIfNotExists(partition, topicName); createSubscriptionIfNotExists(partition, topicName);
} }
@Override
public void deleteTopic(String topic) {
TopicName topicName = TopicName.newBuilder()
.setTopic(topic)
.setProject(pubSubSettings.getProjectId())
.build();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(pubSubSettings.getProjectId(), topic);
if (topicSet.contains(topicName.toString())) {
topicAdminClient.deleteTopic(topicName);
} else {
if (topicAdminClient.getTopic(topicName) != null) {
topicAdminClient.deleteTopic(topicName);
} else {
log.warn("PubSub topic [{}] does not exist.", topic);
}
}
if (subscriptionSet.contains(subscriptionName.toString())) {
subscriptionAdminClient.deleteSubscription(subscriptionName);
} else {
if (subscriptionAdminClient.getSubscription(subscriptionName) != null) {
subscriptionAdminClient.deleteSubscription(subscriptionName);
} else {
log.warn("PubSub subscription [{}] does not exist.", topic);
}
}
}
private void createSubscriptionIfNotExists(String partition, TopicName topicName) { private void createSubscriptionIfNotExists(String partition, TopicName topicName) {
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(pubSubSettings.getProjectId(), partition); ProjectSubscriptionName.of(pubSubSettings.getProjectId(), partition);

View File

@ -58,6 +58,15 @@ public class TbRabbitMqAdmin implements TbQueueAdmin {
} }
} }
@Override
public void deleteTopic(String topic) {
try {
channel.queueDelete(topic);
} catch (IOException e) {
log.error("Failed to delete RabbitMq queue [{}].", topic);
}
}
@Override @Override
public void destroy() { public void destroy() {
if (channel != null) { if (channel != null) {

View File

@ -23,18 +23,20 @@ import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueAdmin;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.function.Function;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j
public class TbAwsSqsAdmin implements TbQueueAdmin { public class TbAwsSqsAdmin implements TbQueueAdmin {
private final Map<String, String> attributes; private final Map<String, String> attributes;
private final AmazonSQS sqsClient; private final AmazonSQS sqsClient;
private final Set<String> queues; private final Map<String, String> queues;
public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map<String, String> attributes) { public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map<String, String> attributes) {
this.attributes = attributes; this.attributes = attributes;
@ -57,18 +59,37 @@ public class TbAwsSqsAdmin implements TbQueueAdmin {
.getQueueUrls() .getQueueUrls()
.stream() .stream()
.map(this::getQueueNameFromUrl) .map(this::getQueueNameFromUrl)
.collect(Collectors.toCollection(ConcurrentHashMap::newKeySet)); .collect(Collectors.toMap(this::convertTopicToQueueName, Function.identity()));
} }
@Override @Override
public void createTopicIfNotExists(String topic) { public void createTopicIfNotExists(String topic) {
String queueName = topic.replaceAll("\\.", "_") + ".fifo"; String queueName = convertTopicToQueueName(topic);
if (queues.contains(queueName)) { if (queues.containsKey(queueName)) {
return; return;
} }
final CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName).withAttributes(attributes); final CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName).withAttributes(attributes);
String queueUrl = sqsClient.createQueue(createQueueRequest).getQueueUrl(); String queueUrl = sqsClient.createQueue(createQueueRequest).getQueueUrl();
queues.add(getQueueNameFromUrl(queueUrl)); queues.put(getQueueNameFromUrl(queueUrl), queueUrl);
}
private String convertTopicToQueueName(String topic) {
return topic.replaceAll("\\.", "_") + ".fifo";
}
@Override
public void deleteTopic(String topic) {
String queueName = convertTopicToQueueName(topic);
if (queues.containsKey(queueName)) {
sqsClient.deleteQueue(queues.get(queueName));
} else {
GetQueueUrlResult queueUrl = sqsClient.getQueueUrl(queueName);
if (queueUrl != null) {
sqsClient.deleteQueue(queueUrl.getQueueUrl());
} else {
log.warn("Aws SQS queue [{}] does not exist!", queueName);
}
}
} }
private String getQueueNameFromUrl(String queueUrl) { private String getQueueNameFromUrl(String queueUrl) {

View File

@ -38,7 +38,6 @@ import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.service.PaginatedRemover; import org.thingsboard.server.dao.service.PaginatedRemover;
import org.thingsboard.server.dao.service.Validator; import org.thingsboard.server.dao.service.Validator;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantDao;
import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueAdmin;
import java.util.List; import java.util.List;
@ -51,9 +50,6 @@ public class BaseQueueService extends AbstractEntityService implements QueueServ
@Autowired @Autowired
private QueueDao queueDao; private QueueDao queueDao;
@Autowired
private TenantDao tenantDao;
@Autowired @Autowired
private TbTenantProfileCache tenantProfileCache; private TbTenantProfileCache tenantProfileCache;
@ -101,21 +97,21 @@ public class BaseQueueService extends AbstractEntityService implements QueueServ
int oldPartitions = oldQueue.getPartitions(); int oldPartitions = oldQueue.getPartitions();
int currentPartitions = queue.getPartitions(); int currentPartitions = queue.getPartitions();
//TODO: 3.2 remove if partitions can't be deleted. //TODO: remove if partitions can't be deleted.
// if (currentPartitions != oldPartitions && tbQueueAdmin != null) { if (currentPartitions != oldPartitions && tbQueueAdmin != null) {
// queueClusterService.onQueueDelete(queue, null); // queueClusterService.onQueueDelete(queue, null);
// if (currentPartitions > oldPartitions) { if (currentPartitions > oldPartitions) {
// log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName()); log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName());
// for (int i = oldPartitions; i < currentPartitions; i++) { for (int i = oldPartitions; i < currentPartitions; i++) {
// tbQueueAdmin.createTopicIfNotExists(new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName()); tbQueueAdmin.createTopicIfNotExists(new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName());
// } }
// } else { } else {
// log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName()); log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName());
// for (int i = currentPartitions; i < oldPartitions; i++) { for (int i = currentPartitions; i < oldPartitions; i++) {
// tbQueueAdmin.deleteTopic(new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName()); tbQueueAdmin.deleteTopic(new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName());
// } }
// } }
// } }
return updatedQueue; return updatedQueue;
} }