Merge pull request #10417 from dashevchenko/pubsubPrefixFix

Added global queue prefix for pubsub queue factory
This commit is contained in:
Viacheslav Klimov 2024-03-29 11:55:34 +02:00 committed by GitHub
commit ec36558f42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 27 additions and 27 deletions

View File

@ -109,40 +109,40 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(transportNotificationSettings.getNotificationsTopic()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic());
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, topicService.buildTopicName(ruleEngineSettings.getTopic()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(ruleEngineSettings.getTopic()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()));
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createToVersionControlMsgConsumer() {
return new TbPubSubConsumerTemplate<>(vcAdmin, pubSubSettings, vcSettings.getTopic(),
return new TbPubSubConsumerTemplate<>(vcAdmin, pubSubSettings, topicService.buildTopicName(vcSettings.getTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom(msg.getData()), msg.getHeaders())
);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration) {
return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, configuration.getTopic(),
return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, topicService.buildTopicName(configuration.getTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@ -155,7 +155,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(),
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@ -168,13 +168,13 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() {
return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getRequestsTopic(),
return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, topicService.buildTopicName(transportApiSettings.getRequestsTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic());
return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, topicService.buildTopicName(transportApiSettings.getResponsesTopic()));
}
@Override
@ -202,29 +202,29 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic(),
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic(),
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getOtaPackageTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getOtaPackageTopic()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createVersionControlMsgProducer() {
return new TbPubSubProducerTemplate<>(vcAdmin, pubSubSettings, vcSettings.getTopic());
return new TbPubSubProducerTemplate<>(vcAdmin, pubSubSettings, topicService.buildTopicName(vcSettings.getTopic()));
}
@PreDestroy

View File

@ -100,32 +100,32 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(transportNotificationSettings.getNotificationsTopic()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(ruleEngineSettings.getTopic()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()));
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(),
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@ -138,13 +138,13 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() {
return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getRequestsTopic(),
return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, topicService.buildTopicName(transportApiSettings.getRequestsTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic());
return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, topicService.buildTopicName(transportApiSettings.getResponsesTopic()));
}
@Override
@ -172,24 +172,24 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic(),
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic(),
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getOtaPackageTopic()),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getOtaPackageTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getOtaPackageTopic()));
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic());
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, topicService.buildTopicName(coreSettings.getUsageStatsTopic()));
}
@Override