diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java index ba806c5daf..af0b276c1a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java @@ -45,6 +45,8 @@ import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='monolith'") public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory { @@ -153,4 +155,23 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { return null; } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (jsExecutorAdmin != null) { + jsExecutorAdmin.destroy(); + } + if (transportApiAdmin != null) { + transportApiAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java index eaffce241f..0a23d58e46 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java @@ -42,6 +42,8 @@ import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-core'") public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { @@ -128,4 +130,20 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { return null; } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (jsExecutorAdmin != null) { + jsExecutorAdmin.destroy(); + } + if (transportApiAdmin != null) { + transportApiAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java index 3206f03250..79501130ed 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java @@ -41,6 +41,8 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-rule-engine'") public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { @@ -117,4 +119,20 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { return null; } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (jsExecutorAdmin != null) { + jsExecutorAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java index 2a5de01f86..7e859e02ab 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueFactory.java @@ -40,6 +40,8 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") @Slf4j @@ -111,4 +113,20 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory { transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (transportApiAdmin != null) { + transportApiAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java index f3b6d3f10c..1241370d05 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java @@ -40,9 +40,10 @@ public class TbPubSubAdmin implements TbQueueAdmin { private static final String ACK_DEADLINE = "ackDeadlineInSec"; private static final String MESSAGE_RETENTION = "messageRetentionInSec"; + private final TopicAdminClient topicAdminClient; + private final SubscriptionAdminClient subscriptionAdminClient; + private final TbPubSubSettings pubSubSettings; - private final SubscriptionAdminSettings subscriptionAdminSettings; - private final TopicAdminSettings topicAdminSettings; private final Set topicSet = ConcurrentHashMap.newKeySet(); private final Set subscriptionSet = ConcurrentHashMap.newKeySet(); private final Map subscriptionProperties; @@ -51,6 +52,7 @@ public class TbPubSubAdmin implements TbQueueAdmin { this.pubSubSettings = pubSubSettings; this.subscriptionProperties = subscriptionSettings; + TopicAdminSettings topicAdminSettings; try { topicAdminSettings = TopicAdminSettings.newBuilder().setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build(); } catch (IOException e) { @@ -58,6 +60,7 @@ public class TbPubSubAdmin implements TbQueueAdmin { throw new RuntimeException("Failed to create TopicAdminSettings."); } + SubscriptionAdminSettings subscriptionAdminSettings; try { subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder().setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build(); } catch (IOException e) { @@ -65,7 +68,9 @@ public class TbPubSubAdmin implements TbQueueAdmin { throw new RuntimeException("Failed to create SubscriptionAdminSettings."); } - try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) { + try { + topicAdminClient = TopicAdminClient.create(topicAdminSettings); + ListTopicsRequest listTopicsRequest = ListTopicsRequest.newBuilder().setProject(ProjectName.format(pubSubSettings.getProjectId())).build(); TopicAdminClient.ListTopicsPagedResponse response = topicAdminClient.listTopics(listTopicsRequest); @@ -77,7 +82,8 @@ public class TbPubSubAdmin implements TbQueueAdmin { throw new RuntimeException("Failed to get topics.", e); } - try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings)) { + try { + subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings); ListSubscriptionsRequest listSubscriptionsRequest = ListSubscriptionsRequest.newBuilder() @@ -104,31 +110,21 @@ public class TbPubSubAdmin implements TbQueueAdmin { return; } - try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) { - ListTopicsRequest listTopicsRequest = - ListTopicsRequest.newBuilder().setProject(ProjectName.format(pubSubSettings.getProjectId())).build(); - TopicAdminClient.ListTopicsPagedResponse response = topicAdminClient.listTopics(listTopicsRequest); - for (Topic topic : response.iterateAll()) { - if (topic.getName().contains(topicName.toString())) { - topicSet.add(topic.getName()); - createSubscriptionIfNotExists(partition, topicName); - return; - } + ListTopicsRequest listTopicsRequest = + ListTopicsRequest.newBuilder().setProject(ProjectName.format(pubSubSettings.getProjectId())).build(); + TopicAdminClient.ListTopicsPagedResponse response = topicAdminClient.listTopics(listTopicsRequest); + for (Topic topic : response.iterateAll()) { + if (topic.getName().contains(topicName.toString())) { + topicSet.add(topic.getName()); + createSubscriptionIfNotExists(partition, topicName); + return; } - - topicAdminClient.createTopic(topicName); - topicSet.add(topicName.toString()); - log.info("Created new topic: [{}]", topicName.toString()); - createSubscriptionIfNotExists(partition, topicName); - } catch (IOException e) { - log.error("Failed to create topic: [{}].", topicName.toString(), e); - throw new RuntimeException("Failed to create topic.", e); } - } - - @Override - public void destroy() { + topicAdminClient.createTopic(topicName); + topicSet.add(topicName.toString()); + log.info("Created new topic: [{}]", topicName.toString()); + createSubscriptionIfNotExists(partition, topicName); } private void createSubscriptionIfNotExists(String partition, ProjectTopicName topicName) { @@ -139,36 +135,27 @@ public class TbPubSubAdmin implements TbQueueAdmin { return; } - try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings)) { - ListSubscriptionsRequest listSubscriptionsRequest = - ListSubscriptionsRequest.newBuilder() - .setProject(ProjectName.of(pubSubSettings.getProjectId()).toString()) - .build(); - SubscriptionAdminClient.ListSubscriptionsPagedResponse response = - subscriptionAdminClient.listSubscriptions(listSubscriptionsRequest); - - for (Subscription subscription : response.iterateAll()) { - if (subscription.getName().equals(subscriptionName.toString())) { - subscriptionSet.add(subscription.getName()); - return; - } + ListSubscriptionsRequest listSubscriptionsRequest = + ListSubscriptionsRequest.newBuilder().setProject(ProjectName.of(pubSubSettings.getProjectId()).toString()).build(); + SubscriptionAdminClient.ListSubscriptionsPagedResponse response = subscriptionAdminClient.listSubscriptions(listSubscriptionsRequest); + for (Subscription subscription : response.iterateAll()) { + if (subscription.getName().equals(subscriptionName.toString())) { + subscriptionSet.add(subscription.getName()); + return; } - - Subscription.Builder subscriptionBuilder = Subscription - .newBuilder() - .setName(subscriptionName.toString()) - .setTopic(topicName.toString()); - - setAckDeadline(subscriptionBuilder); - setMessageRetention(subscriptionBuilder); - - subscriptionAdminClient.createSubscription(subscriptionBuilder.build()); - subscriptionSet.add(subscriptionName.toString()); - log.info("Created new subscription: [{}]", subscriptionName.toString()); - } catch (IOException e) { - log.error("Failed to create subscription: [{}].", subscriptionName.toString(), e); - throw new RuntimeException("Failed to create subscription.", e); } + + Subscription.Builder subscriptionBuilder = Subscription + .newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()); + + setAckDeadline(subscriptionBuilder); + setMessageRetention(subscriptionBuilder); + + subscriptionAdminClient.createSubscription(subscriptionBuilder.build()); + subscriptionSet.add(subscriptionName.toString()); + log.info("Created new subscription: [{}]", subscriptionName.toString()); } private void setAckDeadline(Subscription.Builder builder) { @@ -186,4 +173,14 @@ public class TbPubSubAdmin implements TbQueueAdmin { builder.setMessageRetentionDuration(duration); } } + + @Override + public void destroy() { + if (topicAdminClient != null) { + topicAdminClient.close(); + } + if (subscriptionAdminClient != null) { + subscriptionAdminClient.close(); + } + } }