Minor refactoring for TopicPartitionInfo
This commit is contained in:
parent
0e8e460434
commit
95d1b5ebd7
@ -251,7 +251,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
|||||||
mainConsumer.update(event.getCorePartitions());
|
mainConsumer.update(event.getCorePartitions());
|
||||||
usageStatsConsumer.subscribe(event.getCorePartitions()
|
usageStatsConsumer.subscribe(event.getCorePartitions()
|
||||||
.stream()
|
.stream()
|
||||||
.map(tpi -> tpi.newByTopic(usageStatsConsumer.getConsumer().getTopic()))
|
.map(tpi -> tpi.withTopic(usageStatsConsumer.getConsumer().getTopic()))
|
||||||
.collect(Collectors.toSet()));
|
.collect(Collectors.toSet()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -57,10 +57,6 @@ public class TopicPartitionInfo {
|
|||||||
this(topic, tenantId, partition, false, myPartition);
|
this(topic, tenantId, partition, false, myPartition);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TopicPartitionInfo newByTopic(String topic) {
|
|
||||||
return new TopicPartitionInfo(topic, this.tenantId, this.partition, this.useInternalPartition, this.myPartition);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getTopic() {
|
public String getTopic() {
|
||||||
return topic;
|
return topic;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -136,9 +136,6 @@ public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public CreateTopicsResult createTopic(NewTopic topic) {
|
public CreateTopicsResult createTopic(NewTopic topic) {
|
||||||
if (!topic.name().startsWith("test.")) { // FIXME: remove me
|
|
||||||
log.error("Creating topic without configured prefix: {}", topic.name(), new RuntimeException("stacktrace"));
|
|
||||||
}
|
|
||||||
return settings.getAdminClient().createTopics(Collections.singletonList(topic));
|
return settings.getAdminClient().createTopics(Collections.singletonList(topic));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -131,7 +131,7 @@ public class DefaultTbApiUsageReportClient implements TbApiUsageReportClient {
|
|||||||
report.forEach((parent, statsMsg) -> {
|
report.forEach((parent, statsMsg) -> {
|
||||||
try {
|
try {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, parent.getTenantId(), parent.getId())
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, parent.getTenantId(), parent.getId())
|
||||||
.newByTopic(msgProducer.getDefaultTopic());
|
.withTopic(msgProducer.getDefaultTopic());
|
||||||
reportStatsPerTpi.computeIfAbsent(tpi, k -> new ArrayList<>()).add(statsMsg.build());
|
reportStatsPerTpi.computeIfAbsent(tpi, k -> new ArrayList<>()).add(statsMsg.build());
|
||||||
} catch (TenantNotFoundException e) {
|
} catch (TenantNotFoundException e) {
|
||||||
log.debug("Couldn't report usage stats for non-existing tenant: {}", e.getTenantId());
|
log.debug("Couldn't report usage stats for non-existing tenant: {}", e.getTenantId());
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user