Merge pull request #5749 from dmytro-landiak/queue-factories-fix

[3.3.3] Queue factories improvements
This commit is contained in:
Igor Kulikov 2021-12-20 15:48:21 +02:00 committed by GitHub
commit 3ccc0bd817
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 150 additions and 72 deletions

View File

@ -102,7 +102,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, ruleEngineSettings.getTopic());
}
@Override
@ -112,7 +112,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}
@Override
@ -182,13 +182,13 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

View File

@ -43,6 +43,7 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
@ -63,6 +64,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -77,7 +79,8 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes) {
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueTransportNotificationSettings transportNotificationSettings) {
this.sqsSettings = sqsSettings;
this.coreSettings = coreSettings;
this.transportApiSettings = transportApiSettings;
@ -85,6 +88,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
@ -95,7 +99,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
@ -105,7 +109,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, ruleEngineSettings.getTopic());
}
@Override
@ -115,7 +119,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}
@Override
@ -139,7 +143,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getResponsesTopic());
}
@Override
@ -172,13 +176,13 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
@ -57,6 +58,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbAwsSqsSettings sqsSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -68,13 +70,15 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
TbServiceInfoProvider serviceInfoProvider,
TbAwsSqsSettings sqsSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings) {
this.partitionService = partitionService;
this.coreSettings = coreSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.ruleEngineSettings = ruleEngineSettings;
this.sqsSettings = sqsSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
@ -84,17 +88,17 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, ruleEngineSettings.getTopic());
}
@Override
@ -104,7 +108,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}
@Override

View File

@ -32,6 +32,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
@ -51,34 +52,39 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
private final TbAwsSqsSettings sqsSettings;
private final TbQueueCoreSettings coreSettings;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin ruleEngineAdmin;
public AwsSqsTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbAwsSqsSettings sqsSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes) {
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueRuleEngineSettings ruleEngineSettings) {
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.sqsSettings = sqsSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.transportApiAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getTransportApiAttributes());
this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes());
this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
}
@Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
TbAwsSqsProducerTemplate<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
TbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic());
TbAwsSqsConsumerTemplate<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
TbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings,
transportApiSettings.getResponsesTopic() + "_" + serviceInfoProvider.getServiceId(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
@ -96,7 +102,7 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic());
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
}
@Override
@ -126,5 +132,8 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
}
}

View File

@ -131,7 +131,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-rule-engine-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
requestBuilder.admin(ruleEngineAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -151,7 +151,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-core-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -328,5 +328,8 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (fwUpdatesAdmin != null) {
fwUpdatesAdmin.destroy();
}
}
}

View File

@ -49,6 +49,7 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
@ -65,6 +66,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbKafkaConsumerStatsService consumerStatsService;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -80,6 +82,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
TbQueueTransportApiSettings transportApiSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.partitionService = partitionService;
this.kafkaSettings = kafkaSettings;
@ -89,6 +92,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
this.transportApiSettings = transportApiSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.consumerStatsService = consumerStatsService;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@ -103,8 +107,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-core-transport-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.defaultTopic(transportNotificationSettings.getNotificationsTopic());
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -124,7 +128,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-core-rule-engine-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
requestBuilder.admin(ruleEngineAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -144,7 +148,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-core-to-core-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -192,8 +196,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-core-transport-api-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic());
requestBuilder.admin(transportApiAdmin);
return requestBuilder.build();
}
@ -294,5 +298,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (fwUpdatesAdmin != null) {
fwUpdatesAdmin.destroy();
}
}
}

View File

@ -46,6 +46,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
@ -63,6 +64,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbKafkaConsumerStatsService consumerStatsService;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -77,6 +79,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbQueueRuleEngineSettings ruleEngineSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.partitionService = partitionService;
this.kafkaSettings = kafkaSettings;
@ -85,6 +88,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
this.ruleEngineSettings = ruleEngineSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.consumerStatsService = consumerStatsService;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@ -98,8 +102,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-transport-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.defaultTopic(transportNotificationSettings.getNotificationsTopic());
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -108,8 +112,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-to-rule-engine-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
requestBuilder.admin(ruleEngineAdmin);
return requestBuilder.build();
}
@ -119,11 +123,10 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-to-rule-engine-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
requestBuilder.admin(ruleEngineAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
@ -150,7 +153,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-to-core-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -239,5 +242,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (fwUpdatesAdmin != null) {
fwUpdatesAdmin.destroy();
}
}
}

View File

@ -112,7 +112,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic());
}
@Override
@ -122,7 +122,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic());
}
@Override

View File

@ -46,7 +46,9 @@ import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
@ -61,11 +63,14 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin jsExecutorAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin ruleEngineAdmin;
public PubSubTbCoreQueueFactory(TbPubSubSettings pubSubSettings,
TbQueueCoreSettings coreSettings,
@ -73,6 +78,8 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
TbPubSubSubscriptionSettings pubSubSubscriptionSettings) {
this.pubSubSettings = pubSubSettings;
this.coreSettings = coreSettings;
@ -80,16 +87,19 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings());
this.transportApiAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getTransportApiSettings());
this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings());
this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
@ -99,7 +109,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic());
}
@Override
@ -109,7 +119,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic());
}
@Override
@ -133,7 +143,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic());
}
@Override
@ -195,5 +205,8 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
}
}

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
@ -60,6 +61,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -72,6 +74,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbPubSubSubscriptionSettings pubSubSubscriptionSettings) {
this.pubSubSettings = pubSubSettings;
this.coreSettings = coreSettings;
@ -79,6 +82,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings());
@ -88,28 +92,27 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic());
}
@Override

View File

@ -66,13 +66,13 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbRabbitMqSettings rabbitMqSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
public RabbitMqMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
@ -89,13 +89,13 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.rabbitMqSettings = rabbitMqSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs());
this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs());
this.jsExecutorAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getJsExecutorArgs());
this.transportApiAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getTransportApiArgs());
this.notificationAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getNotificationsArgs());
this.jsInvokeSettings = jsInvokeSettings;
}
@Override
@ -110,7 +110,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
}
@Override
@ -120,7 +120,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, coreSettings.getTopic());
}
@Override

View File

@ -48,6 +48,7 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
@ -63,6 +64,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -77,6 +79,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbRabbitMqQueueArguments queueArguments) {
this.rabbitMqSettings = rabbitMqSettings;
this.coreSettings = coreSettings;
@ -85,6 +88,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs());
this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs());
@ -95,7 +99,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
@ -105,7 +109,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
}
@Override
@ -115,7 +119,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, coreSettings.getTopic());
}
@Override
@ -139,7 +143,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getResponsesTopic());
}
@Override

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
@ -60,6 +61,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbRabbitMqSettings rabbitMqSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -71,6 +73,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
TbServiceInfoProvider serviceInfoProvider,
TbRabbitMqSettings rabbitMqSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbRabbitMqQueueArguments queueArguments) {
this.partitionService = partitionService;
this.coreSettings = coreSettings;
@ -78,6 +81,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
this.ruleEngineSettings = ruleEngineSettings;
this.rabbitMqSettings = rabbitMqSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs());
this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs());
@ -87,17 +91,17 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
}
@Override
@ -107,7 +111,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, coreSettings.getTopic());
}
@Override

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate;
import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments;
import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
@ -51,6 +52,7 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory {
private final TbRabbitMqSettings rabbitMqSettings;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueCoreSettings coreSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -62,12 +64,14 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory {
TbRabbitMqSettings rabbitMqSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
TbRabbitMqQueueArguments queueArguments) {
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.rabbitMqSettings = rabbitMqSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs());
this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs());
@ -77,10 +81,10 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory {
@Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
TbRabbitMqProducerTemplate<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
TbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
new TbRabbitMqProducerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getRequestsTopic());
TbRabbitMqConsumerTemplate<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
TbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
new TbRabbitMqConsumerTemplate<>(transportApiAdmin, rabbitMqSettings,
transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
@ -98,7 +102,7 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbRabbitMqProducerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getRequestsTopic());
return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
}
@Override

View File

@ -109,7 +109,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
}
@Override
@ -119,7 +119,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, coreSettings.getTopic());
}
@Override

View File

@ -48,6 +48,7 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
@ -63,6 +64,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -77,6 +79,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbServiceBusQueueConfigs serviceBusQueueConfigs) {
this.serviceBusSettings = serviceBusSettings;
this.coreSettings = coreSettings;
@ -85,6 +88,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
@ -95,7 +99,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
@ -105,7 +109,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
}
@Override
@ -115,7 +119,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, coreSettings.getTopic());
}
@Override
@ -139,7 +143,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getResponsesTopic());
}
@Override

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
@ -60,6 +61,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbServiceBusSettings serviceBusSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -71,6 +73,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
TbServiceInfoProvider serviceInfoProvider,
TbServiceBusSettings serviceBusSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbServiceBusQueueConfigs serviceBusQueueConfigs) {
this.partitionService = partitionService;
this.coreSettings = coreSettings;
@ -78,6 +81,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
this.ruleEngineSettings = ruleEngineSettings;
this.serviceBusSettings = serviceBusSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
@ -87,17 +91,17 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
}
@Override
@ -107,7 +111,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, coreSettings.getTopic());
}
@Override

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
@ -51,14 +52,17 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory
private final TbServiceBusSettings serviceBusSettings;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueCoreSettings coreSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin ruleEngineAdmin;
public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbServiceBusSettings serviceBusSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbServiceBusQueueConfigs serviceBusQueueConfigs) {
@ -67,10 +71,12 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory
this.serviceBusSettings = serviceBusSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
this.transportApiAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getTransportApiConfigs());
this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs());
this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
}
@Override
@ -96,7 +102,7 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic());
return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
}
@Override
@ -127,5 +133,8 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
}
}