Short-lived cache for Kafka topics

This commit is contained in:
VIacheslavKlimov 2025-08-06 14:43:34 +03:00
parent 1097005178
commit e7580f6093
4 changed files with 77 additions and 47 deletions

View File

@ -80,8 +80,8 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
return;
}
Set<String> topics = kafkaAdmin.getAllTopics();
if (topics == null || topics.isEmpty()) {
Set<String> topics = kafkaAdmin.listTopics();
if (topics.isEmpty()) {
return;
}

View File

@ -29,7 +29,9 @@ import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.CachedValue;
import org.thingsboard.server.queue.util.TbKafkaComponent;
import java.util.Collections;
@ -49,37 +51,44 @@ import java.util.stream.Collectors;
@Slf4j
public class KafkaAdmin {
/*
* TODO: Get rid of per consumer/producer TbKafkaAdmin,
* use single KafkaAdmin instance that accepts topicConfigs.
* TODO: Get rid of per consumer/producer TbKafkaAdmin,
* use single KafkaAdmin instance that accepts topicConfigs.
* */
private final TbKafkaSettings settings;
private final LazyInitializer<AdminClient> adminClient;
private final CachedValue<Set<String>> topics;
private volatile Set<String> topics;
public KafkaAdmin(TbKafkaSettings settings) {
public KafkaAdmin(@Lazy TbKafkaSettings settings) {
this.settings = settings;
this.adminClient = LazyInitializer.<AdminClient>builder()
.setInitializer(() -> AdminClient.create(settings.toAdminProps()))
.get();
this.topics = new CachedValue<>(() -> {
Set<String> topics = ConcurrentHashMap.newKeySet();
topics.addAll(listTopics());
return topics;
}, TimeUnit.MINUTES.toMillis(5));
}
public void createTopicIfNotExists(String topic, Map<String, String> properties, boolean force) {
if (!force) {
Set<String> topics = getTopics();
if (topics.contains(topic)) {
return;
}
Set<String> topics = getTopics();
if (!force && topics.contains(topic)) {
log.trace("Topic {} already exists", topic);
return;
}
try {
String numPartitionsStr = properties.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
int partitions = numPartitionsStr != null ? Integer.parseInt(numPartitionsStr) : 1;
NewTopic newTopic = new NewTopic(topic, partitions, settings.getReplicationFactor()).configs(properties);
log.debug("Creating topic {} with properties {}", topic, properties);
String numPartitionsStr = properties.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
int partitions = numPartitionsStr != null ? Integer.parseInt(numPartitionsStr) : 1;
NewTopic newTopic = new NewTopic(topic, partitions, settings.getReplicationFactor()).configs(properties);
try {
createTopic(newTopic).values().get(topic).get();
topics.add(topic);
} catch (ExecutionException ee) {
log.trace("Failed to create topic {} with properties {}", topic, properties, ee);
if (ee.getCause() instanceof TopicExistsException) {
//do nothing
} else {
@ -93,48 +102,29 @@ public class KafkaAdmin {
}
public void deleteTopic(String topic) {
Set<String> topics = getTopics();
if (topics.remove(topic)) {
getClient().deleteTopics(Collections.singletonList(topic));
} else {
try {
if (getClient().listTopics().names().get().contains(topic)) {
getClient().deleteTopics(Collections.singletonList(topic));
} else {
log.warn("Kafka topic [{}] does not exist.", topic);
}
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to delete kafka topic [{}].", topic, e);
}
log.debug("Deleting topic {}", topic);
try {
getClient().deleteTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Failed to delete kafka topic [{}].", topic, e);
}
}
private Set<String> getTopics() {
if (topics == null) {
synchronized (this) {
if (topics == null) {
topics = ConcurrentHashMap.newKeySet();
try {
topics.addAll(getClient().listTopics().names().get());
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to get all topics.", e);
}
}
}
}
return topics;
return topics.get();
}
public Set<String> getAllTopics() {
public Set<String> listTopics() {
try {
return getClient().listTopics().names().get();
Set<String> topics = getClient().listTopics().names().get();
log.trace("Listed topics: {}", topics);
return topics;
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to get all topics.", e);
return Collections.emptySet();
}
return null;
}
public CreateTopicsResult createTopic(NewTopic topic) {
return getClient().createTopics(Collections.singletonList(topic));
}

View File

@ -28,7 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
@SpringBootTest(classes = TbKafkaSettings.class)
@SpringBootTest(classes = {TbKafkaSettings.class, KafkaAdmin.class})
@TestPropertySource(properties = {
"queue.type=kafka",
"queue.kafka.bootstrap.servers=localhost:9092",

View File

@ -0,0 +1,40 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class CachedValue<V> {
private static final Object KEY = new Object();
private final LoadingCache<Object, V> cache;
public CachedValue(Supplier<V> supplier, long valueTtlMs) {
this.cache = Caffeine.newBuilder()
.expireAfterWrite(valueTtlMs, TimeUnit.MILLISECONDS)
.build(__ -> supplier.get());
}
public V get() {
return cache.get(KEY);
}
}