added Azure Service Bus queue settings

This commit is contained in:
YevhenBondarenko 2020-04-26 00:57:46 +03:00 committed by Andrew Shvayka
parent 30e5af3627
commit ba140644c7
14 changed files with 368 additions and 54 deletions

View File

@ -556,6 +556,12 @@ queue:
sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}"
max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}"
queue-properties:
rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
rabbitmq: rabbitmq:
exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}" exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}"
host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}" host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}"

View File

@ -16,27 +16,32 @@
package org.thingsboard.server.queue.azure.servicebus; package org.thingsboard.server.queue.azure.servicebus;
import com.microsoft.azure.servicebus.management.ManagementClient; import com.microsoft.azure.servicebus.management.ManagementClient;
import com.microsoft.azure.servicebus.management.QueueDescription;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException; import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueAdmin;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@Slf4j @Slf4j
@Component
@ConditionalOnExpression("'${queue.type:null}'=='service-bus'")
public class TbServiceBusAdmin implements TbQueueAdmin { public class TbServiceBusAdmin implements TbQueueAdmin {
private final String MAX_SIZE = "maxSizeInMb";
private final String MESSAGE_TIME_TO_LIVE = "messageTimeToLiveInSec";
private final String LOCK_DURATION = "lockDurationInSec";
private final Map<String, String> queueConfigs;
private final Set<String> queues = ConcurrentHashMap.newKeySet(); private final Set<String> queues = ConcurrentHashMap.newKeySet();
private final ManagementClient client; private final ManagementClient client;
public TbServiceBusAdmin(TbServiceBusSettings serviceBusSettings) { public TbServiceBusAdmin(TbServiceBusSettings serviceBusSettings, Map<String, String> queueConfigs) {
this.queueConfigs = queueConfigs;
ConnectionStringBuilder builder = new ConnectionStringBuilder( ConnectionStringBuilder builder = new ConnectionStringBuilder(
serviceBusSettings.getNamespaceName(), serviceBusSettings.getNamespaceName(),
"queues", "queues",
@ -60,13 +65,34 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
} }
try { try {
client.createQueue(topic); QueueDescription queueDescription = new QueueDescription(topic);
setQueueConfigs(queueDescription);
client.createQueue(queueDescription);
queues.add(topic); queues.add(topic);
} catch (ServiceBusException | InterruptedException e) { } catch (ServiceBusException | InterruptedException e) {
log.error("Failed to create queue: [{}]", topic, e); log.error("Failed to create queue: [{}]", topic, e);
} }
} }
private void setQueueConfigs(QueueDescription queueDescription) {
queueConfigs.forEach((confKey, confValue) -> {
switch (confKey) {
case MAX_SIZE:
queueDescription.setMaxSizeInMB(Long.parseLong(confValue));
break;
case MESSAGE_TIME_TO_LIVE:
queueDescription.setDefaultMessageTimeToLive(Duration.ofSeconds(Long.parseLong(confValue)));
break;
case LOCK_DURATION:
queueDescription.setLockDuration(Duration.ofSeconds(Long.parseLong(confValue)));
break;
default:
log.error("Unknown config: [{}]", confKey);
}
});
}
public void destroy() { public void destroy() {
try { try {
client.close(); client.close();

View File

@ -0,0 +1,71 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.azure.servicebus;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='service-bus'")
public class TbServiceBusQueueConfigs {
@Value("${queue.service-bus.queue-properties.core}")
private String coreProperties;
@Value("${queue.service-bus.queue-properties.rule-engine}")
private String ruleEngineProperties;
@Value("${queue.service-bus.queue-properties.transport-api}")
private String transportApiProperties;
@Value("${queue.service-bus.queue-properties.notifications}")
private String notificationsProperties;
@Value("${queue.service-bus.queue-properties.js-executor}")
private String jsExecutorProperties;
@Getter
private Map<String, String> coreConfigs;
@Getter
private Map<String, String> ruleEngineConfigs;
@Getter
private Map<String, String> transportApiConfigs;
@Getter
private Map<String, String> notificationsConfigs;
@Getter
private Map<String, String> jsExecutorConfigs;
@PostConstruct
private void init() {
coreConfigs = getConfigs(coreProperties);
ruleEngineConfigs = getConfigs(ruleEngineProperties);
transportApiConfigs = getConfigs(transportApiProperties);
notificationsConfigs = getConfigs(notificationsProperties);
jsExecutorConfigs = getConfigs(jsExecutorProperties);
}
private Map<String, String> getConfigs(String properties) {
Map<String, String> configs = new HashMap<>();
for (String property : properties.split(";")) {
int delimiterPosition = property.indexOf(":");
String key = property.substring(0, delimiterPosition);
String value = property.substring(delimiterPosition + 1);
configs.put(key, value);
}
return configs;
}
}

View File

@ -49,6 +49,7 @@ import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@Component @Component
@ -247,4 +248,23 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); builder.pollInterval(jsInvokeSettings.getResponsePollInterval());
return builder.build(); return builder.build();
} }
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
if (jsExecutorAdmin != null) {
jsExecutorAdmin.destroy();
}
if (transportApiAdmin != null) {
transportApiAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
}
} }

View File

@ -47,6 +47,7 @@ import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@Component @Component
@ -218,4 +219,22 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
return builder.build(); return builder.build();
} }
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
if (jsExecutorAdmin != null) {
jsExecutorAdmin.destroy();
}
if (transportApiAdmin != null) {
transportApiAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
}
} }

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@Component @Component
@ -190,4 +191,20 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); builder.pollInterval(jsInvokeSettings.getResponsePollInterval());
return builder.build(); return builder.build();
} }
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
if (jsExecutorAdmin != null) {
jsExecutorAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
}
} }

View File

@ -40,6 +40,8 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import javax.annotation.PreDestroy;
@Component @Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") @ConditionalOnExpression("'${queue.type:null}'=='kafka' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')")
@Slf4j @Slf4j
@ -135,4 +137,20 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
responseBuilder.admin(notificationAdmin); responseBuilder.admin(notificationAdmin);
return responseBuilder.build(); return responseBuilder.build();
} }
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
if (transportApiAdmin != null) {
transportApiAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
}
} }

View File

@ -30,19 +30,25 @@ import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
@Component @Component
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'") @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'")
public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory { public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory {
@ -54,7 +60,12 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbServiceBusSettings serviceBusSettings; private final TbServiceBusSettings serviceBusSettings;
private final TbQueueAdmin admin;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
public ServiceBusMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, public ServiceBusMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings, TbQueueRuleEngineSettings ruleEngineSettings,
@ -62,7 +73,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
TbQueueTransportApiSettings transportApiSettings, TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings, TbQueueTransportNotificationSettings transportNotificationSettings,
TbServiceBusSettings serviceBusSettings, TbServiceBusSettings serviceBusSettings,
TbQueueAdmin admin) { TbServiceBusQueueConfigs serviceBusQueueConfigs) {
this.partitionService = partitionService; this.partitionService = partitionService;
this.coreSettings = coreSettings; this.coreSettings = coreSettings;
this.serviceInfoProvider = serviceInfoProvider; this.serviceInfoProvider = serviceInfoProvider;
@ -70,73 +81,97 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
this.transportApiSettings = transportApiSettings; this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings; this.transportNotificationSettings = transportNotificationSettings;
this.serviceBusSettings = serviceBusSettings; this.serviceBusSettings = serviceBusSettings;
this.admin = admin;
this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
this.jsExecutorAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getJsExecutorConfigs());
this.transportApiAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getTransportApiConfigs());
this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic()); return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(), return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings,
partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() { public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() {
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(), return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings,
partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() { public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() {
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(), return new TbServiceBusConsumerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() { public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getResponsesTopic()); return new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getResponsesTopic());
} }
@Override @Override
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null; return null;
} }
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
if (jsExecutorAdmin != null) {
jsExecutorAdmin.destroy();
}
if (transportApiAdmin != null) {
transportApiAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
}
} }

View File

@ -29,8 +29,10 @@ import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -40,6 +42,8 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import javax.annotation.PreDestroy;
@Component @Component
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'") @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'")
public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
@ -50,7 +54,12 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportApiSettings transportApiSettings;
private final PartitionService partitionService; private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider; private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueAdmin admin;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
public ServiceBusTbCoreQueueFactory(TbServiceBusSettings serviceBusSettings, public ServiceBusTbCoreQueueFactory(TbServiceBusSettings serviceBusSettings,
TbQueueCoreSettings coreSettings, TbQueueCoreSettings coreSettings,
@ -58,67 +67,91 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
TbQueueRuleEngineSettings ruleEngineSettings, TbQueueRuleEngineSettings ruleEngineSettings,
PartitionService partitionService, PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider, TbServiceInfoProvider serviceInfoProvider,
TbQueueAdmin admin) { TbServiceBusQueueConfigs serviceBusQueueConfigs) {
this.serviceBusSettings = serviceBusSettings; this.serviceBusSettings = serviceBusSettings;
this.coreSettings = coreSettings; this.coreSettings = coreSettings;
this.transportApiSettings = transportApiSettings; this.transportApiSettings = transportApiSettings;
this.ruleEngineSettings = ruleEngineSettings; this.ruleEngineSettings = ruleEngineSettings;
this.partitionService = partitionService; this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider; this.serviceInfoProvider = serviceInfoProvider;
this.admin = admin;
this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
this.jsExecutorAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getJsExecutorConfigs());
this.transportApiAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getTransportApiConfigs());
this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() { public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() {
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(), return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings,
partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() { public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() {
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(), return new TbServiceBusConsumerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() { public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
} }
@Override @Override
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null; return null;
} }
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
if (jsExecutorAdmin != null) {
jsExecutorAdmin.destroy();
}
if (transportApiAdmin != null) {
transportApiAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
}
} }

View File

@ -27,8 +27,10 @@ import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -38,6 +40,8 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
@Component @Component
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-rule-engine'") @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-rule-engine'")
public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
@ -47,55 +51,63 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
private final TbServiceInfoProvider serviceInfoProvider; private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbServiceBusSettings serviceBusSettings; private final TbServiceBusSettings serviceBusSettings;
private final TbQueueAdmin admin;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorAdmin;
private final TbQueueAdmin notificationAdmin;
public ServiceBusTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, public ServiceBusTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings, TbQueueRuleEngineSettings ruleEngineSettings,
TbServiceInfoProvider serviceInfoProvider, TbServiceInfoProvider serviceInfoProvider,
TbServiceBusSettings serviceBusSettings, TbServiceBusSettings serviceBusSettings,
TbQueueAdmin admin) { TbServiceBusQueueConfigs serviceBusQueueConfigs) {
this.partitionService = partitionService; this.partitionService = partitionService;
this.coreSettings = coreSettings; this.coreSettings = coreSettings;
this.serviceInfoProvider = serviceInfoProvider; this.serviceInfoProvider = serviceInfoProvider;
this.ruleEngineSettings = ruleEngineSettings; this.ruleEngineSettings = ruleEngineSettings;
this.serviceBusSettings = serviceBusSettings; this.serviceBusSettings = serviceBusSettings;
this.admin = admin;
this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
this.jsExecutorAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getJsExecutorConfigs());
this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(), return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings,
partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
} }
@ -104,4 +116,20 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
return null; return null;
} }
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
if (jsExecutorAdmin != null) {
jsExecutorAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
}
} }

View File

@ -23,6 +23,8 @@ import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs;
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
@ -33,6 +35,8 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import javax.annotation.PreDestroy;
@Component @Component
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')")
@Slf4j @Slf4j
@ -40,37 +44,43 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory
private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbServiceBusSettings serviceBusSettings; private final TbServiceBusSettings serviceBusSettings;
private final TbQueueAdmin admin;
private final TbServiceInfoProvider serviceInfoProvider; private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueCoreSettings coreSettings; private final TbQueueCoreSettings coreSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings, TbQueueTransportNotificationSettings transportNotificationSettings,
TbServiceBusSettings serviceBusSettings, TbServiceBusSettings serviceBusSettings,
TbServiceInfoProvider serviceInfoProvider, TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings, TbQueueCoreSettings coreSettings,
TbQueueAdmin admin) { TbServiceBusQueueConfigs serviceBusQueueConfigs) {
this.transportApiSettings = transportApiSettings; this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings; this.transportNotificationSettings = transportNotificationSettings;
this.serviceBusSettings = serviceBusSettings; this.serviceBusSettings = serviceBusSettings;
this.admin = admin;
this.serviceInfoProvider = serviceInfoProvider; this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings; this.coreSettings = coreSettings;
this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
this.transportApiAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getTransportApiConfigs());
this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs());
} }
@Override @Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiRequestTemplate() { public TbQueueRequestTemplate<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiRequestTemplate() {
TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> producerTemplate = TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> producerTemplate =
new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic());
TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> consumerTemplate = TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> consumerTemplate =
new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, new TbServiceBusConsumerTemplate<>(transportApiAdmin, serviceBusSettings,
transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(), transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder(); <TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
templateBuilder.queueAdmin(admin); templateBuilder.queueAdmin(transportApiAdmin);
templateBuilder.requestTemplate(producerTemplate); templateBuilder.requestTemplate(producerTemplate);
templateBuilder.responseTemplate(consumerTemplate); templateBuilder.responseTemplate(consumerTemplate);
templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
@ -81,18 +91,31 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); return new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic());
} }
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() {
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
} }
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsConsumer() { public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsConsumer() {
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings,
transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
} }
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
coreAdmin.destroy();
}
if (transportApiAdmin != null) {
transportApiAdmin.destroy();
}
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
}
} }

View File

@ -95,6 +95,12 @@ queue:
sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}"
max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}"
queue-properties:
rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
rabbitmq: rabbitmq:
exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}" exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}"
host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}" host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}"

View File

@ -96,6 +96,12 @@ queue:
sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}"
max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}"
queue-properties:
rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
rabbitmq: rabbitmq:
exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}" exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}"
host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}" host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}"

View File

@ -116,6 +116,12 @@ queue:
sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}"
max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}"
queue-properties:
rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
rabbitmq: rabbitmq:
exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}" exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}"
host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}" host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}"