Fix missing queue prefixes
This commit is contained in:
parent
d15cb27ecf
commit
ef65dd9026
@ -18,6 +18,7 @@ package org.thingsboard.server.service.edqs;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.edqs.EdqsConfig;
|
||||
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
|
||||
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
|
||||
@ -32,11 +33,11 @@ public class KafkaEdqsSyncService extends EdqsSyncService {
|
||||
|
||||
private final boolean syncNeeded;
|
||||
|
||||
public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, EdqsConfig edqsConfig) {
|
||||
public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, TopicService topicService, EdqsConfig edqsConfig) {
|
||||
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap());
|
||||
this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions())
|
||||
.mapToObj(partition -> TopicPartitionInfo.builder()
|
||||
.topic(edqsConfig.getEventsTopic())
|
||||
.topic(topicService.buildTopicName(edqsConfig.getEventsTopic()))
|
||||
.partition(partition)
|
||||
.build().getFullTopicName())
|
||||
.collect(Collectors.toSet()));
|
||||
|
||||
@ -58,6 +58,7 @@ import org.thingsboard.server.queue.TbQueueResponseTemplate;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
|
||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||
import org.thingsboard.server.queue.edqs.EdqsComponent;
|
||||
import org.thingsboard.server.queue.edqs.EdqsConfig;
|
||||
@ -88,6 +89,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
|
||||
private final EdqsRepository repository;
|
||||
private final EdqsConfig config;
|
||||
private final EdqsPartitionService partitionService;
|
||||
private final TopicService topicService;
|
||||
private final ConfigurableApplicationContext applicationContext;
|
||||
private final EdqsStateService stateService;
|
||||
|
||||
@ -123,7 +125,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
|
||||
|
||||
eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
|
||||
.queueKey(new QueueKey(ServiceType.EDQS, config.getEventsTopic()))
|
||||
.topic(config.getEventsTopic())
|
||||
.topic(topicService.buildTopicName(config.getEventsTopic()))
|
||||
.pollInterval(config.getPollInterval())
|
||||
.msgPackProcessor((msgs, consumer, config) -> {
|
||||
for (TbProtoQueueMsg<ToEdqsMsg> queueMsg : msgs) {
|
||||
@ -164,9 +166,9 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
|
||||
try {
|
||||
Set<TopicPartitionInfo> newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS));
|
||||
|
||||
stateService.process(withTopic(newPartitions, config.getStateTopic()));
|
||||
stateService.process(withTopic(newPartitions, topicService.buildTopicName(config.getStateTopic())));
|
||||
// eventsConsumer's partitions are updated by stateService
|
||||
responseTemplate.subscribe(withTopic(newPartitions, config.getRequestsTopic())); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template
|
||||
responseTemplate.subscribe(withTopic(newPartitions, topicService.buildTopicName(config.getRequestsTopic()))); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template
|
||||
|
||||
Set<TopicPartitionInfo> oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS));
|
||||
if (CollectionsUtil.isNotEmpty(oldPartitions)) {
|
||||
|
||||
@ -36,6 +36,7 @@ import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
|
||||
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
|
||||
import org.thingsboard.server.queue.common.state.QueueStateService;
|
||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.edqs.EdqsConfig;
|
||||
import org.thingsboard.server.queue.edqs.KafkaEdqsComponent;
|
||||
import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory;
|
||||
@ -59,6 +60,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
||||
private final EdqsConfig config;
|
||||
private final EdqsPartitionService partitionService;
|
||||
private final KafkaEdqsQueueFactory queueFactory;
|
||||
private final TopicService topicService;
|
||||
@Autowired
|
||||
@Lazy
|
||||
private EdqsProcessor edqsProcessor;
|
||||
@ -78,7 +80,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
||||
TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin();
|
||||
stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
|
||||
.queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic()))
|
||||
.topic(config.getStateTopic())
|
||||
.topic(topicService.buildTopicName(config.getStateTopic()))
|
||||
.pollInterval(config.getPollInterval())
|
||||
.msgPackProcessor((msgs, consumer, config) -> {
|
||||
for (TbProtoQueueMsg<ToEdqsMsg> queueMsg : msgs) {
|
||||
@ -176,7 +178,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
||||
if (queueStateService.getPartitions().isEmpty()) {
|
||||
Set<TopicPartitionInfo> allPartitions = IntStream.range(0, config.getPartitions())
|
||||
.mapToObj(partition -> TopicPartitionInfo.builder()
|
||||
.topic(config.getEventsTopic())
|
||||
.topic(topicService.buildTopicName(config.getEventsTopic()))
|
||||
.partition(partition)
|
||||
.build())
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
@ -156,7 +156,7 @@ public class HashPartitionService implements PartitionService {
|
||||
|
||||
@Override
|
||||
public String getTopic(QueueKey queueKey) {
|
||||
return partitionTopicsMap.get(queueKey);
|
||||
return topicService.buildTopicName(partitionTopicsMap.get(queueKey));
|
||||
}
|
||||
|
||||
private void doInitRuleEnginePartitions() {
|
||||
|
||||
@ -136,6 +136,9 @@ public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin {
|
||||
}
|
||||
|
||||
public CreateTopicsResult createTopic(NewTopic topic) {
|
||||
if (!topic.name().startsWith("test.")) { // FIXME: remove me
|
||||
log.error("Creating topic without configured prefix: {}", topic.name(), new RuntimeException("stacktrace"));
|
||||
}
|
||||
return settings.getAdminClient().createTopics(Collections.singletonList(topic));
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user