diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 3df3cb9559..efd076f6ea 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -556,6 +556,12 @@ queue: sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" + queue-properties: + rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" rabbitmq: exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}" host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java index 336514b7b2..229d6b4244 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java @@ -16,27 +16,32 @@ package org.thingsboard.server.queue.azure.servicebus; import com.microsoft.azure.servicebus.management.ManagementClient; +import com.microsoft.azure.servicebus.management.QueueDescription; import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; import com.microsoft.azure.servicebus.primitives.ServiceBusException; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.stereotype.Component; import org.thingsboard.server.queue.TbQueueAdmin; import java.io.IOException; +import java.time.Duration; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @Slf4j -@Component -@ConditionalOnExpression("'${queue.type:null}'=='service-bus'") public class TbServiceBusAdmin implements TbQueueAdmin { + private final String MAX_SIZE = "maxSizeInMb"; + private final String MESSAGE_TIME_TO_LIVE = "messageTimeToLiveInSec"; + private final String LOCK_DURATION = "lockDurationInSec"; + private final Map queueConfigs; private final Set queues = ConcurrentHashMap.newKeySet(); private final ManagementClient client; - public TbServiceBusAdmin(TbServiceBusSettings serviceBusSettings) { + public TbServiceBusAdmin(TbServiceBusSettings serviceBusSettings, Map queueConfigs) { + this.queueConfigs = queueConfigs; + ConnectionStringBuilder builder = new ConnectionStringBuilder( serviceBusSettings.getNamespaceName(), "queues", @@ -60,13 +65,34 @@ public class TbServiceBusAdmin implements TbQueueAdmin { } try { - client.createQueue(topic); + QueueDescription queueDescription = new QueueDescription(topic); + setQueueConfigs(queueDescription); + + client.createQueue(queueDescription); queues.add(topic); } catch (ServiceBusException | InterruptedException e) { log.error("Failed to create queue: [{}]", topic, e); } } + private void setQueueConfigs(QueueDescription queueDescription) { + queueConfigs.forEach((confKey, confValue) -> { + switch (confKey) { + case MAX_SIZE: + queueDescription.setMaxSizeInMB(Long.parseLong(confValue)); + break; + case MESSAGE_TIME_TO_LIVE: + queueDescription.setDefaultMessageTimeToLive(Duration.ofSeconds(Long.parseLong(confValue))); + break; + case LOCK_DURATION: + queueDescription.setLockDuration(Duration.ofSeconds(Long.parseLong(confValue))); + break; + default: + log.error("Unknown config: [{}]", confKey); + } + }); + } + public void destroy() { try { client.close(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusQueueConfigs.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusQueueConfigs.java new file mode 100644 index 0000000000..21ab455d6e --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusQueueConfigs.java @@ -0,0 +1,71 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.azure.servicebus; + +import lombok.Getter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.HashMap; +import java.util.Map; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='service-bus'") +public class TbServiceBusQueueConfigs { + @Value("${queue.service-bus.queue-properties.core}") + private String coreProperties; + @Value("${queue.service-bus.queue-properties.rule-engine}") + private String ruleEngineProperties; + @Value("${queue.service-bus.queue-properties.transport-api}") + private String transportApiProperties; + @Value("${queue.service-bus.queue-properties.notifications}") + private String notificationsProperties; + @Value("${queue.service-bus.queue-properties.js-executor}") + private String jsExecutorProperties; + + @Getter + private Map coreConfigs; + @Getter + private Map ruleEngineConfigs; + @Getter + private Map transportApiConfigs; + @Getter + private Map notificationsConfigs; + @Getter + private Map jsExecutorConfigs; + + @PostConstruct + private void init() { + coreConfigs = getConfigs(coreProperties); + ruleEngineConfigs = getConfigs(ruleEngineProperties); + transportApiConfigs = getConfigs(transportApiProperties); + notificationsConfigs = getConfigs(notificationsProperties); + jsExecutorConfigs = getConfigs(jsExecutorProperties); + } + + private Map getConfigs(String properties) { + Map configs = new HashMap<>(); + for (String property : properties.split(";")) { + int delimiterPosition = property.indexOf(":"); + String key = property.substring(0, delimiterPosition); + String value = property.substring(delimiterPosition + 1); + configs.put(key, value); + } + return configs; + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 72c6f7b8b9..2aca5e6da1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -49,6 +49,7 @@ import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; +import javax.annotation.PreDestroy; import java.nio.charset.StandardCharsets; @Component @@ -247,4 +248,23 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); return builder.build(); } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (jsExecutorAdmin != null) { + jsExecutorAdmin.destroy(); + } + if (transportApiAdmin != null) { + transportApiAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index ca10830981..632a6e480e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -47,6 +47,7 @@ import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; +import javax.annotation.PreDestroy; import java.nio.charset.StandardCharsets; @Component @@ -218,4 +219,22 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { return builder.build(); } + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (jsExecutorAdmin != null) { + jsExecutorAdmin.destroy(); + } + if (transportApiAdmin != null) { + transportApiAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index bcac399a64..dc44caf3f0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -45,6 +45,7 @@ import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; +import javax.annotation.PreDestroy; import java.nio.charset.StandardCharsets; @Component @@ -190,4 +191,20 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); return builder.build(); } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (jsExecutorAdmin != null) { + jsExecutorAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java index 2f7aab4779..505aa203d7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java @@ -40,6 +40,8 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='kafka' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") @Slf4j @@ -135,4 +137,20 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { responseBuilder.admin(notificationAdmin); return responseBuilder.build(); } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (transportApiAdmin != null) { + transportApiAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java index 8b01c38607..3c82e18d91 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java @@ -30,19 +30,25 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'") public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory { @@ -54,7 +60,12 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbServiceBusSettings serviceBusSettings; - private final TbQueueAdmin admin; + + private final TbQueueAdmin coreAdmin; + private final TbQueueAdmin ruleEngineAdmin; + private final TbQueueAdmin jsExecutorAdmin; + private final TbQueueAdmin transportApiAdmin; + private final TbQueueAdmin notificationAdmin; public ServiceBusMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, @@ -62,7 +73,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbServiceBusSettings serviceBusSettings, - TbQueueAdmin admin) { + TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; @@ -70,73 +81,97 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.serviceBusSettings = serviceBusSettings; - this.admin = admin; + + this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); + this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); + this.jsExecutorAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getJsExecutorConfigs()); + this.transportApiAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getTransportApiConfigs()); + this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs()); } @Override public TbQueueProducer> createTransportNotificationsMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic()); + return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic()); } @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreNotificationsMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); } @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(), + return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() { - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, + return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings, partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToCoreMsgConsumer() { - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(), + return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToCoreNotificationsMsgConsumer() { - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, + return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings, partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createTransportApiRequestConsumer() { - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(), + return new TbServiceBusConsumerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueProducer> createTransportApiResponseProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getResponsesTopic()); + return new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getResponsesTopic()); } @Override public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { return null; } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (jsExecutorAdmin != null) { + jsExecutorAdmin.destroy(); + } + if (transportApiAdmin != null) { + transportApiAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java index 523ac88bd1..bcb48630f9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java @@ -29,8 +29,10 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -40,6 +42,8 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'") public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { @@ -50,7 +54,12 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; - private final TbQueueAdmin admin; + + private final TbQueueAdmin coreAdmin; + private final TbQueueAdmin ruleEngineAdmin; + private final TbQueueAdmin jsExecutorAdmin; + private final TbQueueAdmin transportApiAdmin; + private final TbQueueAdmin notificationAdmin; public ServiceBusTbCoreQueueFactory(TbServiceBusSettings serviceBusSettings, TbQueueCoreSettings coreSettings, @@ -58,67 +67,91 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, - TbQueueAdmin admin) { + TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.serviceBusSettings = serviceBusSettings; this.coreSettings = coreSettings; this.transportApiSettings = transportApiSettings; this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; - this.admin = admin; + + this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); + this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); + this.jsExecutorAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getJsExecutorConfigs()); + this.transportApiAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getTransportApiConfigs()); + this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs()); } @Override public TbQueueProducer> createTransportNotificationsMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreNotificationsMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); } @Override public TbQueueConsumer> createToCoreMsgConsumer() { - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(), + return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToCoreNotificationsMsgConsumer() { - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, + return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings, partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createTransportApiRequestConsumer() { - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(), + return new TbServiceBusConsumerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueProducer> createTransportApiResponseProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); } @Override public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { return null; } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (jsExecutorAdmin != null) { + jsExecutorAdmin.destroy(); + } + if (transportApiAdmin != null) { + transportApiAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java index 8a2f9b396b..1f492596a1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java @@ -27,8 +27,10 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -38,6 +40,8 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-rule-engine'") public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { @@ -47,55 +51,63 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbServiceBusSettings serviceBusSettings; - private final TbQueueAdmin admin; + + private final TbQueueAdmin coreAdmin; + private final TbQueueAdmin ruleEngineAdmin; + private final TbQueueAdmin jsExecutorAdmin; + private final TbQueueAdmin notificationAdmin; public ServiceBusTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, TbServiceInfoProvider serviceInfoProvider, TbServiceBusSettings serviceBusSettings, - TbQueueAdmin admin) { + TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; this.ruleEngineSettings = ruleEngineSettings; this.serviceBusSettings = serviceBusSettings; - this.admin = admin; + + this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); + this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); + this.jsExecutorAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getJsExecutorConfigs()); + this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs()); } @Override public TbQueueProducer> createTransportNotificationsMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createRuleEngineNotificationsMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); } @Override public TbQueueProducer> createTbCoreNotificationsMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); } @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(), + return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); } @Override public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() { - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, + return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings, partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); } @@ -104,4 +116,20 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { return null; } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (ruleEngineAdmin != null) { + ruleEngineAdmin.destroy(); + } + if (jsExecutorAdmin != null) { + jsExecutorAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java index 33c5277ca5..0b5640d384 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTransportQueueFactory.java @@ -23,6 +23,8 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin; +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; @@ -33,6 +35,8 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; +import javax.annotation.PreDestroy; + @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") @Slf4j @@ -40,37 +44,43 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbServiceBusSettings serviceBusSettings; - private final TbQueueAdmin admin; private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueCoreSettings coreSettings; + private final TbQueueAdmin coreAdmin; + private final TbQueueAdmin transportApiAdmin; + private final TbQueueAdmin notificationAdmin; + public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, - TbQueueTransportNotificationSettings transportNotificationSettings, - TbServiceBusSettings serviceBusSettings, - TbServiceInfoProvider serviceInfoProvider, + TbQueueTransportNotificationSettings transportNotificationSettings, + TbServiceBusSettings serviceBusSettings, + TbServiceInfoProvider serviceInfoProvider, TbQueueCoreSettings coreSettings, - TbQueueAdmin admin) { + TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.serviceBusSettings = serviceBusSettings; - this.admin = admin; this.serviceInfoProvider = serviceInfoProvider; this.coreSettings = coreSettings; + + this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); + this.transportApiAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getTransportApiConfigs()); + this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs()); } @Override public TbQueueRequestTemplate, TbProtoQueueMsg> createTransportApiRequestTemplate() { TbQueueProducer> producerTemplate = - new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); + new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic()); TbQueueConsumer> consumerTemplate = - new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, + new TbServiceBusConsumerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder(); - templateBuilder.queueAdmin(admin); + templateBuilder.queueAdmin(transportApiAdmin); templateBuilder.requestTemplate(producerTemplate); templateBuilder.responseTemplate(consumerTemplate); templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); @@ -81,18 +91,31 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory @Override public TbQueueProducer> createRuleEngineMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); + return new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic()); } @Override public TbQueueProducer> createTbCoreMsgProducer() { - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); } @Override public TbQueueConsumer> createTransportNotificationsConsumer() { - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, + return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); } + + @PreDestroy + private void destroy() { + if (coreAdmin != null) { + coreAdmin.destroy(); + } + if (transportApiAdmin != null) { + transportApiAdmin.destroy(); + } + if (notificationAdmin != null) { + notificationAdmin.destroy(); + } + } } diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 2d70330187..47a011c35f 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -95,6 +95,12 @@ queue: sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" + queue-properties: + rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" rabbitmq: exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}" host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 0b86dc026f..55236dd84f 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -96,6 +96,12 @@ queue: sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" + queue-properties: + rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" rabbitmq: exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}" host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index e267016623..3872a6d0b3 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -116,6 +116,12 @@ queue: sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" + queue-properties: + rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" + js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" rabbitmq: exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}" host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}"