added executor provider for pubsub queue, added env variable to manage pubsub queue executor provider thread pool

This commit is contained in:
dashevchenko 2023-11-15 14:51:58 +02:00
parent 2e1a2b9677
commit 08727b4c36
24 changed files with 194 additions and 113 deletions

View File

@ -29,7 +29,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.service.executors.PubSubExecutorService;
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.SmsService;
@ -325,7 +325,7 @@ public class ActorSystemContext {
@Autowired
@Getter
private PubSubExecutorService pubSubExecutorService;
private PubSubRuleNodeExecutorProvider pubSubRuleNodeExecutorProvider;
@Autowired
@Getter

View File

@ -22,7 +22,6 @@ import lombok.extern.slf4j.Slf4j;
import org.bouncycastle.util.Arrays;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.server.service.executors.PubSubExecutorService;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
@ -105,6 +104,7 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
import org.thingsboard.server.service.script.RuleNodeTbelScriptEngine;
@ -540,8 +540,8 @@ class DefaultTbContext implements TbContext {
}
@Override
public PubSubExecutorService getPubsubExecutor() {
return mainCtx.getPubSubExecutorService();
public PubSubRuleNodeExecutorProvider getPubSubRuleNodeExecutorProvider() {
return mainCtx.getPubSubRuleNodeExecutorProvider();
}
@Override

View File

@ -1,26 +0,0 @@
package org.thingsboard.server.service.executors;
import com.google.api.gax.core.FixedExecutorProvider;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.PubSubExecutor;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import javax.annotation.PostConstruct;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Component
public class PubSubExecutorService implements PubSubExecutor {
private static final int THREADS_PER_CPU = 5;
private FixedExecutorProvider fixedExecutorProvider;
@PostConstruct
public void init() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("tb-pubsub-producer-scheduler"));;
this.fixedExecutorProvider = FixedExecutorProvider.create(scheduler);
}
public FixedExecutorProvider getExecutorProvider() {
return fixedExecutorProvider;
}
}

View File

@ -0,0 +1,29 @@
package org.thingsboard.server.service.executors;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ExecutorProvider;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import javax.annotation.PostConstruct;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Component
public class PubSubRuleNodeExecutorProvider implements ExecutorProvider {
/**
* Refers to com.google.cloud.pubsub.v1.Publisher default executor configuration
*/
private static final int THREADS_PER_CPU = 5;
private ScheduledExecutorService executor;
@PostConstruct
public void init() {
executor = Executors.newScheduledThreadPool(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("pubsub-rule-nodes"));;
}
@Override
public ScheduledExecutorService getExecutor() {
return executor;
}
}

View File

@ -1270,7 +1270,7 @@ swagger:
# Queue configuration parameters
queue:
type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
type: "${TB_QUEUE_TYPE:pubsub}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
prefix: "${TB_QUEUE_PREFIX:}" # Global queue prefix. If specified, prefix is added before default topic name: 'prefix.default_topic_name'. Prefix is applied to all topics (and consumer groups for kafka) except of js executor topics (please use REMOTE_JS_EVAL_REQUEST_TOPIC and REMOTE_JS_EVAL_RESPONSE_TOPIC to specify custom topic names)
in_memory:
stats:
@ -1404,13 +1404,15 @@ queue:
version-control: "${TB_QUEUE_AWS_SQS_VC_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
pubsub:
# Project ID from Google Cloud
project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}"
project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:pubsub-389210}"
# API Credentials in JSON format
service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}"
service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:{\n \"type\": \"service_account\",\n \"project_id\": \"pubsub-389210\",\n \"private_key_id\": \"e83e233f60bb75f52e6ee9d34046c1e7b5e3f2e6\",\n \"private_key\": \"-----BEGIN PRIVATE KEY-----\\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCmram5Oa60kyFM\\nFwo7d4yYA2U4vLUa2HV7h5G5ZxhXTPk+V4eYneBPSX0bJ/V5L4z7N/xNNwDR80i3\\n+tCFIf9P+jS3sE0ThZDc8Rdgo1Ln8nm4qf/9uBrOdSjn5VKKwvnNXphwTniwzssS\\nWn3AMuL8flgKoOiRaBOMXykIWN4/epKmv1tmnnq57XLW2cRTsE+pCcbqn8W2LEgA\\nNP+NJvvhQ6wsU341F2oTMAsuwsmiOZ1aiKKCafjGb/STPYy/WXsLjrfwK08U2SJh\\n+hHyvzdH1AU40khelnLjQhrltUGk1VAaKtucDKGSAoL5JIkH0Q5qczz1Ys6l3VvK\\nlhDNP8NHAgMBAAECggEAFUVJSRGC6ZlzIN452I/vmcCpLL3sUMI57+tDngj5kr6I\\neju6WrAfHY8vz4TgbzgxwieJY6M20BQ/ffccoDjP97li++QcWfbsHz4HMTZ2kJIh\\nlX7gY9UYWquZ7koWKA4sydgeFQr2nP2u7actsBbuX7GR871IILJK+Fl2h5Grvybk\\nz70RzLY+M26ZVBf+ejKlyaoMVLQ6w272llc5/SJ4jmsbn1urLM2+ChgAjOeqo7Qp\\nBBjUJmjC9b5Tev0jg+FXdw+ba41bXmYM5x265buwbY4vYyIJ+XF6dEl36cdVa1xI\\n+XISQZ66SuvEkHRc+x1hEEMymOtt8d++SeFn4/BkDQKBgQDkGtYA/8751C8xDB4+\\nlpxXf3wN/QjWPpXAV1fyj9sgwz05M1E0KzYpX6H2rWVqFhPkuqZUCeZ5Iqjvybp7\\n8upLJ+lRpHag1dkS4ElICNNcHhIQB9xgmFtdH4euTzuAFTd5nZ43oRt6gGEo0I/S\\nmKmupS2IwpM2/Qm6dP7l2ruS5QKBgQC7D8eTIIu8x2plJ6rebEAneX6aAUzQ9chk\\nRB53wjDx0UCXC/J2l1PX0vGGY8JOQdmDS6MPdFaVZxnqtrrGNBDuSArDNH7bV0ab\\nBKnLStPOwkehQEhg6ShHX7r4EPs6GBTWDQnnCgR8KGq+7rdo/lz7C/TK8+xqRaWq\\nYCUMY4g+uwKBgHtPFokLwHPFhI1bI65p9LJINGPLec93nbSQgvaZVbfsU1hsqWUu\\nRuUu0XtsWPp0XOS1Ed00TOcHGZQm1SzUMFvYg4SjB44CjIprvLG4M6oEh3crCLMf\\nKaS5urs8Eco3rfmMf09LRHOAmwVZWaZa6L+Eg4z+wl5jg7LMNE9FY0kRAoGACU+J\\nwXr3OZg7ZXmJ+bQtpUlY0dWKu0Pgi40QbymNQhwGOP5xPRHfLHjlaKkCfN6uMujK\\n3vQNczZEhfg+Z6sjxJh1YK32OninnQOoZ+P7kuj8o7wNXjV8ucC8D6jYuFWGg/j2\\nKzfGbV+doI9FNcajXiOENa3acJey3T4X2fwCRg0CgYEA0m3iO8HvnWhAs+clP2Ph\\nI8mXNbNbtCa7bOPmhdidSwJpbXNNwKNU7xAXHHueFGlEnNFsQrqy7bPfol4ABOhl\\nV3Zdsuc5v1OXM7IrLZ16cC7fogydSQz8QK86TcC50u/aSmkaevujsJfabC/YF7sR\\nBl8tPN7I4H72JKOuUTn/Sfo=\\n-----END PRIVATE KEY-----\\n\",\n \"client_email\": \"pubsubaccountname@pubsub-389210.iam.gserviceaccount.com\",\n \"client_id\": \"103395736410565244836\",\n \"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\",\n \"token_uri\": \"https://oauth2.googleapis.com/token\",\n \"auth_provider_x509_cert_url\": \"https://www.googleapis.com/oauth2/v1/certs\",\n \"client_x509_cert_url\": \"https://www.googleapis.com/robot/v1/metadata/x509/pubsubaccountname%40pubsub-389210.iam.gserviceaccount.com\",\n \"universe_domain\": \"googleapis.com\"\n}}"
# Message size for PubSub queue.Value in bytes
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}"
# Number of messages per consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}"
queue-properties:
# Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"

View File

@ -44,6 +44,7 @@ import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubQueueExecutorProvider;
import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
@ -77,6 +78,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin vcAdmin;
private final TbPubSubQueueExecutorProvider tbPubSubQueueExecutorProvider;
public PubSubMonolithQueueFactory(TbPubSubSettings pubSubSettings,
TbQueueCoreSettings coreSettings,
@ -87,7 +89,8 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
TbServiceInfoProvider serviceInfoProvider,
TbPubSubSubscriptionSettings pubSubSubscriptionSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueVersionControlSettings vcSettings) {
TbQueueVersionControlSettings vcSettings,
TbPubSubQueueExecutorProvider tbPubSubQueueExecutorProvider) {
this.pubSubSettings = pubSubSettings;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
@ -96,6 +99,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
this.topicService = topicService;
this.serviceInfoProvider = serviceInfoProvider;
this.vcSettings = vcSettings;
this.tbPubSubQueueExecutorProvider = tbPubSubQueueExecutorProvider;
this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings());
@ -109,85 +113,91 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic());
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createToVersionControlMsgConsumer() {
return new TbPubSubConsumerTemplate<>(vcAdmin, pubSubSettings, vcSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders())
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders()),
tbPubSubQueueExecutorProvider
);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration) {
return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, configuration.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()),
tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings,
topicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()),
tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()),
tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings,
topicService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()),
tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() {
return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getRequestsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()),
tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic());
return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic(), tbPubSubQueueExecutorProvider);
}
@Override
@Bean
public TbQueueRequestTemplate<TbProtoJsQueueMsg<RemoteJsRequest>, TbProtoQueueMsg<RemoteJsResponse>> createRemoteJsRequestTemplate() {
TbQueueProducer<TbProtoJsQueueMsg<RemoteJsRequest>> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic());
TbQueueProducer<TbProtoJsQueueMsg<RemoteJsRequest>> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic(), tbPubSubQueueExecutorProvider);
TbQueueConsumer<TbProtoQueueMsg<RemoteJsResponse>> consumer = new TbPubSubConsumerTemplate<>(jsExecutorAdmin, pubSubSettings,
jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(),
msg -> {
RemoteJsResponse.Builder builder = RemoteJsResponse.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
});
}, tbPubSubQueueExecutorProvider);
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoJsQueueMsg<RemoteJsRequest>, TbProtoQueueMsg<RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
@ -203,28 +213,28 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createVersionControlMsgProducer() {
return new TbPubSubProducerTemplate<>(vcAdmin, pubSubSettings, vcSettings.getTopic());
return new TbPubSubProducerTemplate<>(vcAdmin, pubSubSettings, vcSettings.getTopic(), tbPubSubQueueExecutorProvider);
}
@PreDestroy

View File

@ -42,6 +42,7 @@ import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubQueueExecutorProvider;
import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
@ -72,6 +73,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin ruleEngineAdmin;
private final TbPubSubQueueExecutorProvider tbPubSubQueueExecutorProvider;
public PubSubTbCoreQueueFactory(TbPubSubSettings pubSubSettings,
TbQueueCoreSettings coreSettings,
@ -81,7 +83,8 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
TbPubSubSubscriptionSettings pubSubSubscriptionSettings) {
TbPubSubSubscriptionSettings pubSubSubscriptionSettings,
TbPubSubQueueExecutorProvider tbPubSubQueueExecutorProvider) {
this.pubSubSettings = pubSubSettings;
this.coreSettings = coreSettings;
this.transportApiSettings = transportApiSettings;
@ -90,6 +93,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.tbPubSubQueueExecutorProvider = tbPubSubQueueExecutorProvider;
this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings());
@ -100,64 +104,64 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings,
topicService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() {
return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getRequestsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic());
return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic(), tbPubSubQueueExecutorProvider);
}
@Override
@Bean
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic());
TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic(), tbPubSubQueueExecutorProvider);
TbQueueConsumer<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> consumer = new TbPubSubConsumerTemplate<>(jsExecutorAdmin, pubSubSettings,
jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(),
msg -> {
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
});
}, tbPubSubQueueExecutorProvider);
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
@ -173,23 +177,25 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()),
tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()),
tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic(), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic(), tbPubSubQueueExecutorProvider);
}
@Override

View File

@ -40,6 +40,7 @@ import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubQueueExecutorProvider;
import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
@ -62,6 +63,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbPubSubQueueExecutorProvider tbPubSubQueueExecutorProvider;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -75,7 +77,8 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbPubSubSubscriptionSettings pubSubSubscriptionSettings) {
TbPubSubSubscriptionSettings pubSubSubscriptionSettings,
TbPubSubQueueExecutorProvider tbPubSubQueueExecutorProvider) {
this.pubSubSettings = pubSubSettings;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
@ -83,6 +86,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.tbPubSubQueueExecutorProvider = tbPubSubQueueExecutorProvider;
this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings());
@ -92,53 +96,53 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(transportNotificationSettings.getNotificationsTopic()));
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(transportNotificationSettings.getNotificationsTopic()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, topicService.buildTopicName(ruleEngineSettings.getTopic()));
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, topicService.buildTopicName(ruleEngineSettings.getTopic()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(ruleEngineSettings.getTopic()));
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(ruleEngineSettings.getTopic()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()));
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()));
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration) {
return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, topicService.buildTopicName(configuration.getTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings,
topicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()), tbPubSubQueueExecutorProvider);
}
@Override
@Bean
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic());
TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic(), tbPubSubQueueExecutorProvider);
TbQueueConsumer<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> consumer = new TbPubSubConsumerTemplate<>(jsExecutorAdmin, pubSubSettings,
jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(),
msg -> {
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
});
}, tbPubSubQueueExecutorProvider);
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
@ -153,12 +157,12 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getOtaPackageTopic()));
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getOtaPackageTopic()), tbPubSubQueueExecutorProvider);
}
@PreDestroy

View File

@ -26,6 +26,7 @@ import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubQueueExecutorProvider;
import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
@ -41,6 +42,7 @@ public class PubSubTbVersionControlQueueFactory implements TbVersionControlQueue
private final TbQueueCoreSettings coreSettings;
private final TbQueueVersionControlSettings vcSettings;
private final TopicService topicService;
private final TbPubSubQueueExecutorProvider tbPubSubQueueExecutorProvider;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin notificationAdmin;
@ -50,12 +52,14 @@ public class PubSubTbVersionControlQueueFactory implements TbVersionControlQueue
TbQueueCoreSettings coreSettings,
TbQueueVersionControlSettings vcSettings,
TbPubSubSubscriptionSettings pubSubSubscriptionSettings,
TopicService topicService
TopicService topicService,
TbPubSubQueueExecutorProvider tbPubSubQueueExecutorProvider
) {
this.pubSubSettings = pubSubSettings;
this.coreSettings = coreSettings;
this.vcSettings = vcSettings;
this.topicService = topicService;
this.tbPubSubQueueExecutorProvider = tbPubSubQueueExecutorProvider;
this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings());
@ -64,18 +68,19 @@ public class PubSubTbVersionControlQueueFactory implements TbVersionControlQueue
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()));
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createToVersionControlMsgConsumer() {
return new TbPubSubConsumerTemplate<>(vcAdmin, pubSubSettings, topicService.buildTopicName(vcSettings.getTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders())
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders()),
tbPubSubQueueExecutorProvider
);
}

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
import org.thingsboard.server.queue.pubsub.TbPubSubQueueExecutorProvider;
import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
@ -58,6 +59,7 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory {
private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TopicService topicService;
private final TbPubSubQueueExecutorProvider tbPubSubQueueExecutorProvider;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -71,7 +73,8 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory {
TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbPubSubSubscriptionSettings pubSubSubscriptionSettings,
TopicService topicService) {
TopicService topicService,
TbPubSubQueueExecutorProvider tbPubSubQueueExecutorProvider) {
this.pubSubSettings = pubSubSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
@ -79,6 +82,7 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory {
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.topicService = topicService;
this.tbPubSubQueueExecutorProvider = tbPubSubQueueExecutorProvider;
this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings());
@ -88,10 +92,10 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory {
@Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
TbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producer = new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, topicService.buildTopicName(transportApiSettings.getRequestsTopic()));
TbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producer = new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, topicService.buildTopicName(transportApiSettings.getRequestsTopic()), tbPubSubQueueExecutorProvider);
TbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumer = new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings,
topicService.buildTopicName(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()), tbPubSubQueueExecutorProvider);
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
@ -106,29 +110,29 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, topicService.buildTopicName(ruleEngineSettings.getTopic()));
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, topicService.buildTopicName(ruleEngineSettings.getTopic()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()));
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()));
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsConsumer() {
return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings,
topicService.buildTopicName(transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()), tbPubSubQueueExecutorProvider);
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()), tbPubSubQueueExecutorProvider);
}
@PreDestroy

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.queue.pubsub;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
@ -62,7 +63,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> extends AbstractPara
private final SubscriberStub subscriber;
private volatile int messagesPerTopic;
public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder<T> decoder) {
public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder<T> decoder, TbPubSubQueueExecutorProvider tbPubSubQueueExecutorProvider) {
super(topic);
this.admin = admin;
this.pubSubSettings = pubSubSettings;
@ -76,6 +77,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> extends AbstractPara
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(pubSubSettings.getMaxMsgSize())
.build())
.setExecutorProvider(FixedExecutorProvider.create(tbPubSubQueueExecutorProvider.getExecutor()))
.build();
this.subscriber = GrpcSubscriberStub.create(subscriberStubSettings);
} catch (IOException e) {

View File

@ -25,6 +25,7 @@ import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ExecutorProvider;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueAdmin;
@ -53,16 +54,14 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
private final Map<String, Publisher> publisherMap = new ConcurrentHashMap<>();
private final ExecutorService pubExecutor = Executors.newCachedThreadPool();
private static final int THREADS_PER_CPU = 5;
private final FixedExecutorProvider fixedExecutorProvider;
public TbPubSubProducerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String defaultTopic) {
public TbPubSubProducerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String defaultTopic, ExecutorProvider executorProvider) {
this.defaultTopic = defaultTopic;
this.admin = admin;
this.pubSubSettings = pubSubSettings;
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("tb-pubsub-producer-scheduler"));;
fixedExecutorProvider = FixedExecutorProvider.create(scheduler);
fixedExecutorProvider = FixedExecutorProvider.create(executorProvider.getExecutor());
}
@Override

View File

@ -0,0 +1,37 @@
package org.thingsboard.server.queue.pubsub;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ExecutorProvider;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import javax.annotation.PostConstruct;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ConditionalOnExpression("'${queue.type:null}'=='pubsub'")
@Component
public class TbPubSubQueueExecutorProvider implements ExecutorProvider {
@Value("${queue.pubsub.executor_thread_pool_size}")
private Integer threadPoolSize;
/**
* Refers to com.google.cloud.pubsub.v1.Publisher default executor configuration
*/
private static final int THREADS_PER_CPU = 5;
private ScheduledExecutorService executor;
@PostConstruct
public void init() {
if (threadPoolSize == null) {
threadPoolSize = THREADS_PER_CPU * Runtime.getRuntime().availableProcessors();
}
executor = Executors.newScheduledThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("pubsub-queue-executor"));;
}
@Override
public ScheduledExecutorService getExecutor() {
return executor;
}
}

View File

@ -53,10 +53,6 @@
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>

View File

@ -0,0 +1,8 @@
package org.thingsboard.common.util;
import java.util.concurrent.ScheduledExecutorService;
public interface ExecutorProvider {
ScheduledExecutorService getExecutor();
}

View File

@ -1,7 +0,0 @@
package org.thingsboard.common.util;
import com.google.api.gax.core.FixedExecutorProvider;
public interface PubSubExecutor {
FixedExecutorProvider getExecutorProvider();
}

View File

@ -172,6 +172,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes
# Number of messages per consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}"
queue-properties:
# Pub/Sub properties for Core subscribers, messages which will commit after ackDeadlineInSec period can be consumed again
core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"

View File

@ -16,8 +16,8 @@
package org.thingsboard.rule.engine.api;
import io.netty.channel.EventLoopGroup;
import org.thingsboard.common.util.ExecutorProvider;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.common.util.PubSubExecutor;
import org.thingsboard.rule.engine.api.slack.SlackService;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
import org.thingsboard.server.cluster.TbClusterService;
@ -319,7 +319,7 @@ public interface TbContext {
ListeningExecutor getNotificationExecutor();
PubSubExecutor getPubsubExecutor();
ExecutorProvider getPubSubRuleNodeExecutorProvider();
MailService getMailService(boolean isSystem);

View File

@ -69,7 +69,7 @@ public class TbPubSubNode extends TbAbstractExternalNode {
super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbPubSubNodeConfiguration.class);
try {
this.pubSubClient = initPubSubClient(ctx.getPubsubExecutor().getExecutorProvider());
this.pubSubClient = initPubSubClient(FixedExecutorProvider.create(ctx.getPubSubRuleNodeExecutorProvider().getExecutor()));
} catch (Exception e) {
throw new TbNodeException(e);
}

View File

@ -295,6 +295,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}"
# Number of messages per consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}"
queue-properties:
# Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"

View File

@ -278,6 +278,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}"
# Number of messages per a consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}"
queue-properties:
# Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consume again
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"

View File

@ -374,6 +374,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}"
# Number of messages per consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}"
queue-properties:
# Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"

View File

@ -311,6 +311,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}"
# Number of messages per consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}"
queue-properties:
# Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"

View File

@ -264,6 +264,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}"
# Number of messages per consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}"
queue-properties:
# Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"