Merge pull request #13799 from thingsboard/fix/isolated-queues

Fix topics creation for isolated tenants
This commit is contained in:
Viacheslav Klimov 2025-08-06 18:48:13 +03:00 committed by GitHub
commit f206983406
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 439 additions and 326 deletions

View File

@ -59,8 +59,7 @@ import org.thingsboard.server.gen.edge.v1.RequestMsg;
import org.thingsboard.server.gen.edge.v1.ResponseMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent;
@ -153,10 +152,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private TbCoreQueueFactory tbCoreQueueFactory;
@Autowired
private Optional<TbKafkaSettings> kafkaSettings;
@Autowired
private Optional<TbKafkaTopicConfigs> kafkaTopicConfigs;
private Optional<KafkaAdmin> kafkaAdmin;
private Server server;
@ -232,8 +228,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
}
private EdgeGrpcSession createEdgeGrpcSession(StreamObserver<ResponseMsg> outputStream) {
return kafkaSettings.isPresent() && kafkaTopicConfigs.isPresent()
? new KafkaEdgeGrpcSession(ctx, topicService, tbCoreQueueFactory, kafkaSettings.get(), kafkaTopicConfigs.get(), outputStream, this::onEdgeConnect, this::onEdgeDisconnect,
return kafkaAdmin.isPresent()
? new KafkaEdgeGrpcSession(ctx, topicService, tbCoreQueueFactory, kafkaAdmin.get(), outputStream, this::onEdgeConnect, this::onEdgeDisconnect,
sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession)
: new PostgresEdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect,
sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession);
@ -643,10 +639,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
List<EdgeId> toRemove = new ArrayList<>();
for (EdgeGrpcSession session : sessions.values()) {
if (session instanceof KafkaEdgeGrpcSession kafkaSession &&
!kafkaSession.isConnected() &&
kafkaSession.getConsumer() != null &&
kafkaSession.getConsumer().getConsumer() != null &&
!kafkaSession.getConsumer().getConsumer().isStopped()) {
!kafkaSession.isConnected() &&
kafkaSession.getConsumer() != null &&
kafkaSession.getConsumer().getConsumer() != null &&
!kafkaSession.getConsumer().getConsumer().isStopped()) {
toRemove.add(kafkaSession.getEdge().getId());
}
}
@ -663,4 +659,5 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
log.warn("Failed to cleanup kafka sessions", e);
}
}
}

View File

@ -32,9 +32,7 @@ import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.service.edge.EdgeContextComponent;
@ -51,9 +49,7 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
private final TopicService topicService;
private final TbCoreQueueFactory tbCoreQueueFactory;
private final TbKafkaSettings kafkaSettings;
private final TbKafkaTopicConfigs kafkaTopicConfigs;
private final KafkaAdmin kafkaAdmin;
private volatile boolean isHighPriorityProcessing;
@ -63,21 +59,20 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
private ExecutorService consumerExecutor;
public KafkaEdgeGrpcSession(EdgeContextComponent ctx, TopicService topicService, TbCoreQueueFactory tbCoreQueueFactory,
TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs, StreamObserver<ResponseMsg> outputStream,
KafkaAdmin kafkaAdmin, StreamObserver<ResponseMsg> outputStream,
BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener, BiConsumer<Edge, UUID> sessionCloseListener,
ScheduledExecutorService sendDownlinkExecutorService, int maxInboundMessageSize, int maxHighPriorityQueueSizePerSession) {
super(ctx, outputStream, sessionOpenListener, sessionCloseListener, sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession);
this.topicService = topicService;
this.tbCoreQueueFactory = tbCoreQueueFactory;
this.kafkaSettings = kafkaSettings;
this.kafkaTopicConfigs = kafkaTopicConfigs;
this.kafkaAdmin = kafkaAdmin;
}
private void processMsgs(List<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer) {
log.trace("[{}][{}] starting processing edge events", tenantId, edge.getId());
if (!isConnected() || isSyncInProgress() || isHighPriorityProcessing) {
log.debug("[{}][{}] edge not connected, edge sync is not completed or high priority processing in progress, " +
"connected = {}, sync in progress = {}, high priority in progress = {}. Skipping iteration",
"connected = {}, sync in progress = {}, high priority in progress = {}. Skipping iteration",
tenantId, edge.getId(), isConnected(), isSyncInProgress(), isHighPriorityProcessing);
return;
}
@ -159,7 +154,6 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
@Override
public void cleanUp() {
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edge.getId()).getTopic();
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
kafkaAdmin.deleteTopic(topic);
kafkaAdmin.deleteConsumerGroup(topic);
}

View File

@ -35,7 +35,7 @@ import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService;
import org.thingsboard.server.dao.edge.stats.MsgCounters;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@ -63,7 +63,7 @@ public class EdgeStatsService {
private final TimeseriesService tsService;
private final EdgeStatsCounterService statsCounterService;
private final TopicService topicService;
private final Optional<TbKafkaAdmin> tbKafkaAdmin;
private final Optional<KafkaAdmin> kafkaAdmin;
@Value("${edges.stats.ttl:30}")
private int edgesStatsTtlDays;
@ -81,12 +81,12 @@ public class EdgeStatsService {
long ts = now - (now % reportIntervalMillis);
Map<EdgeId, MsgCounters> countersByEdge = statsCounterService.getCounterByEdge();
Map<EdgeId, Long> lagByEdgeId = tbKafkaAdmin.isPresent() ? getEdgeLagByEdgeId(countersByEdge) : Collections.emptyMap();
Map<EdgeId, Long> lagByEdgeId = kafkaAdmin.isPresent() ? getEdgeLagByEdgeId(countersByEdge) : Collections.emptyMap();
Map<EdgeId, MsgCounters> countersByEdgeSnapshot = new HashMap<>(statsCounterService.getCounterByEdge());
countersByEdgeSnapshot.forEach((edgeId, counters) -> {
TenantId tenantId = counters.getTenantId();
if (tbKafkaAdmin.isPresent()) {
if (kafkaAdmin.isPresent()) {
counters.getMsgsLag().set(lagByEdgeId.getOrDefault(edgeId, 0L));
}
List<TsKvEntry> statsEntries = List.of(
@ -109,7 +109,7 @@ public class EdgeStatsService {
e -> topicService.buildEdgeEventNotificationsTopicPartitionInfo(e.getValue().getTenantId(), e.getKey()).getTopic()
));
Map<String, Long> lagByTopic = tbKafkaAdmin.get().getTotalLagForGroupsBulk(new HashSet<>(edgeToTopicMap.values()));
Map<String, Long> lagByTopic = kafkaAdmin.get().getTotalLagForGroupsBulk(new HashSet<>(edgeToTopicMap.values()));
return edgeToTopicMap.entrySet().stream()
.collect(Collectors.toMap(

View File

@ -20,10 +20,8 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -33,8 +31,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService {
private final boolean syncNeeded;
public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, TopicService topicService, EdqsConfig edqsConfig) {
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap());
public KafkaEdqsSyncService(KafkaAdmin kafkaAdmin, TopicService topicService, EdqsConfig edqsConfig) {
this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions())
.mapToObj(partition -> TopicPartitionInfo.builder()
.topic(topicService.buildTopicName(edqsConfig.getEventsTopic()))

View File

@ -176,8 +176,8 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
for (int i = oldPartitions; i < newPartitions; i++) {
tbQueueAdmin.createTopicIfNotExists(
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(),
queue.getCustomProperties()
);
queue.getCustomProperties(),
true); // forcing topic creation because the topic may still be cached on some nodes
}
}

View File

@ -30,9 +30,7 @@ import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.time.Instant;
@ -57,7 +55,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
private final TenantService tenantService;
private final EdgeService edgeService;
private final AttributesService attributesService;
private final TbKafkaAdmin kafkaAdmin;
private final KafkaAdmin kafkaAdmin;
@Value("${sql.ttl.edge_events.edge_events_ttl:2628000}")
private long ttlSeconds;
@ -67,13 +65,13 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService,
TenantService tenantService, AttributesService attributesService,
TopicService topicService, TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs) {
TopicService topicService, KafkaAdmin kafkaAdmin) {
super(partitionService);
this.topicService = topicService;
this.tenantService = tenantService;
this.edgeService = edgeService;
this.attributesService = attributesService;
this.kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
this.kafkaAdmin = kafkaAdmin;
}
@Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.edge_events.execution_interval_ms})}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
@ -82,8 +80,8 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
return;
}
Set<String> topics = kafkaAdmin.getAllTopics();
if (topics == null || topics.isEmpty()) {
Set<String> topics = kafkaAdmin.listTopics();
if (topics.isEmpty()) {
return;
}

View File

@ -33,7 +33,7 @@ import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService;
import org.thingsboard.server.dao.edge.stats.MsgCounters;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.service.edge.stats.EdgeStatsService;
import java.util.List;
@ -141,7 +141,7 @@ public class EdgeStatsTest {
TopicPartitionInfo partitionInfo = new TopicPartitionInfo(topic, tenantId, 0, false);
when(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId)).thenReturn(partitionInfo);
TbKafkaAdmin kafkaAdmin = mock(TbKafkaAdmin.class);
KafkaAdmin kafkaAdmin = mock(KafkaAdmin.class);
when(kafkaAdmin.getTotalLagForGroupsBulk(Set.of(topic)))
.thenReturn(Map.of(topic, 15L));

View File

@ -507,7 +507,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
consumerManager.delete(true);
await().atMost(2, TimeUnit.SECONDS)
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
verify(ruleEngineMsgProducer).send(any(), any(), any());
});

View File

@ -16,7 +16,7 @@
package org.thingsboard.server.queue;
public interface TbEdgeQueueAdmin extends TbQueueAdmin {
void syncEdgeNotificationsOffsets(String fatGroupId, String newGroupId);
void deleteConsumerGroup(String consumerGroupId);
}

View File

@ -18,12 +18,13 @@ package org.thingsboard.server.queue;
public interface TbQueueAdmin {
default void createTopicIfNotExists(String topic) {
createTopicIfNotExists(topic, null);
createTopicIfNotExists(topic, null, false);
}
void createTopicIfNotExists(String topic, String properties);
void createTopicIfNotExists(String topic, String properties, boolean force);
void destroy();
void deleteTopic(String topic);
}

View File

@ -43,7 +43,7 @@ import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsExecutors;
import org.thingsboard.server.queue.edqs.KafkaEdqsComponent;
import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import java.util.HashMap;
@ -68,6 +68,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
private final EdqsExecutors edqsExecutors;
private final EdqsMapper mapper;
private final TopicService topicService;
private final KafkaAdmin kafkaAdmin;
@Autowired
@Lazy
private EdqsProcessor edqsProcessor;
@ -86,7 +87,6 @@ public class KafkaEdqsStateService implements EdqsStateService {
@Override
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer, List<PartitionedQueueConsumerManager<?>> otherConsumers) {
versionsStore = new VersionsStore(config.getVersionsCacheTtl());
TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin();
stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
.queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic()))
.topic(topicService.buildTopicName(config.getStateTopic()))
@ -106,7 +106,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
consumer.commit();
})
.consumerCreator((config, tpi) -> queueFactory.createEdqsStateConsumer())
.queueAdmin(queueAdmin)
.queueAdmin(queueFactory.getEdqsQueueAdmin())
.consumerExecutor(edqsExecutors.getConsumersExecutor())
.taskExecutor(edqsExecutors.getConsumerTaskExecutor())
.scheduler(edqsExecutors.getScheduler())
@ -174,7 +174,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
// (because we need to be able to consume the same topic-partition by multiple instances)
Map<String, Long> offsets = new HashMap<>();
try {
queueAdmin.getConsumerGroupOffsets(eventsToBackupKafkaConsumer.getGroupId())
kafkaAdmin.getConsumerGroupOffsets(eventsToBackupKafkaConsumer.getGroupId())
.forEach((topicPartition, offsetAndMetadata) -> {
offsets.put(topicPartition.topic(), offsetAndMetadata.offset());
});

View File

@ -43,7 +43,7 @@ public class RuleEngineTbQueueAdminFactory {
return new TbQueueAdmin() {
@Override
public void createTopicIfNotExists(String topic, String properties) {
public void createTopicIfNotExists(String topic, String properties, boolean force) {
}
@Override

View File

@ -0,0 +1,285 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.kafka;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.CachedValue;
import org.thingsboard.server.queue.util.TbKafkaComponent;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@TbKafkaComponent
@Component
@Slf4j
public class KafkaAdmin {
/*
* TODO: Get rid of per consumer/producer TbKafkaAdmin,
* use single KafkaAdmin instance that accepts topicConfigs.
* */
private final TbKafkaSettings settings;
@Value("${queue.kafka.request.timeout.ms:30000}")
private int requestTimeoutMs;
@Value("${queue.kafka.topics_cache_ttl_ms:300000}") // 5 minutes by default
private int topicsCacheTtlMs;
private final LazyInitializer<AdminClient> adminClient;
private final CachedValue<Set<String>> topics;
public KafkaAdmin(@Lazy TbKafkaSettings settings) {
this.settings = settings;
this.adminClient = LazyInitializer.<AdminClient>builder()
.setInitializer(() -> AdminClient.create(settings.toAdminProps()))
.get();
this.topics = new CachedValue<>(() -> {
Set<String> topics = ConcurrentHashMap.newKeySet();
topics.addAll(listTopics());
return topics;
}, topicsCacheTtlMs);
}
public void createTopicIfNotExists(String topic, Map<String, String> properties, boolean force) {
Set<String> topics = getTopics();
if (!force && topics.contains(topic)) {
log.trace("Topic {} already present in cache", topic);
return;
}
log.debug("Creating topic {} with properties {}", topic, properties);
String numPartitionsStr = properties.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
int partitions = numPartitionsStr != null ? Integer.parseInt(numPartitionsStr) : 1;
NewTopic newTopic = new NewTopic(topic, partitions, settings.getReplicationFactor()).configs(properties);
try {
getClient().createTopics(List.of(newTopic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
topics.add(topic);
} catch (ExecutionException ee) {
log.trace("Failed to create topic {} with properties {}", topic, properties, ee);
if (ee.getCause() instanceof TopicExistsException) {
//do nothing
} else {
log.warn("[{}] Failed to create topic", topic, ee);
throw new RuntimeException(ee);
}
} catch (Exception e) {
log.warn("[{}] Failed to create topic", topic, e);
throw new RuntimeException(e);
}
}
public void deleteTopic(String topic) {
log.debug("Deleting topic {}", topic);
try {
getClient().deleteTopics(List.of(topic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("Failed to delete kafka topic [{}].", topic, e);
}
}
private Set<String> getTopics() {
return topics.get();
}
public Set<String> listTopics() {
try {
Set<String> topics = getClient().listTopics().names().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
log.trace("Listed topics: {}", topics);
return topics;
} catch (Exception e) {
log.error("Failed to get all topics.", e);
return Collections.emptySet();
}
}
public Map<String, Long> getTotalLagForGroupsBulk(Set<String> groupIds) {
Map<String, Long> result = new HashMap<>();
for (String groupId : groupIds) {
result.put(groupId, getTotalConsumerGroupLag(groupId));
}
return result;
}
public long getTotalConsumerGroupLag(String groupId) {
try {
Map<TopicPartition, OffsetAndMetadata> committedOffsets = getConsumerGroupOffsets(groupId);
if (committedOffsets.isEmpty()) {
return 0L;
}
Map<TopicPartition, OffsetSpec> latestOffsetsSpec = committedOffsets.keySet().stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
getClient().listOffsets(latestOffsetsSpec).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
return committedOffsets.entrySet().stream()
.mapToLong(entry -> {
TopicPartition tp = entry.getKey();
long committed = entry.getValue().offset();
long end = endOffsets.getOrDefault(tp,
new ListOffsetsResult.ListOffsetsResultInfo(0L, 0L, Optional.empty())).offset();
return end - committed;
}).sum();
} catch (Exception e) {
log.error("Failed to get total lag for consumer group: {}", groupId, e);
return 0L;
}
}
@SneakyThrows
public Map<TopicPartition, OffsetAndMetadata> getConsumerGroupOffsets(String groupId) {
return getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
}
/**
* Sync offsets from a fat group to a single-partition group
* Migration back from single-partition consumer to a fat group is not supported
* TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers
* */
public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) {
try {
log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId);
if (partitionId == null) {
return;
}
syncOffsetsUnsafe(fatGroupId, newGroupId, "." + partitionId);
} catch (Exception e) {
log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e);
}
}
public void syncOffsetsUnsafe(String fatGroupId, String newGroupId, String topicSuffix) throws ExecutionException, InterruptedException, TimeoutException {
Map<TopicPartition, OffsetAndMetadata> oldOffsets = getConsumerGroupOffsets(fatGroupId);
if (oldOffsets.isEmpty()) {
return;
}
for (var consumerOffset : oldOffsets.entrySet()) {
var tp = consumerOffset.getKey();
if (!tp.topic().endsWith(topicSuffix)) {
continue;
}
var om = consumerOffset.getValue();
Map<TopicPartition, OffsetAndMetadata> newOffsets = getConsumerGroupOffsets(newGroupId);
var existingOffset = newOffsets.get(tp);
if (existingOffset == null) {
log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets);
} else if (existingOffset.offset() >= om.offset()) {
log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset());
break;
} else {
log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset());
}
getClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
log.info("[{}] altered new consumer groupId {}", tp, newGroupId);
break;
}
}
public boolean isTopicEmpty(String topic) {
return areAllTopicsEmpty(Set.of(topic));
}
public boolean areAllTopicsEmpty(Set<String> topics) {
try {
List<String> existingTopics = getTopics().stream().filter(topics::contains).toList();
if (existingTopics.isEmpty()) {
return true;
}
List<TopicPartition> allPartitions = getClient().describeTopics(existingTopics).allTopicNames().get(requestTimeoutMs, TimeUnit.MILLISECONDS)
.entrySet().stream()
.flatMap(entry -> {
String topic = entry.getKey();
TopicDescription topicDescription = entry.getValue();
return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()));
})
.toList();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> beginningOffsets = getClient().listOffsets(allPartitions.stream()
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = getClient().listOffsets(allPartitions.stream()
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
for (TopicPartition partition : allPartitions) {
long beginningOffset = beginningOffsets.get(partition).offset();
long endOffset = endOffsets.get(partition).offset();
if (beginningOffset != endOffset) {
log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic());
return false;
}
}
return true;
} catch (Exception e) {
log.error("Failed to check if topics [{}] empty.", topics, e);
return false;
}
}
public void deleteConsumerGroup(String consumerGroupId) {
try {
getClient().deleteConsumerGroups(List.of(consumerGroupId)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("Failed to delete consumer group {}", consumerGroupId, e);
}
}
public AdminClient getClient() {
try {
return adminClient.get();
} catch (ConcurrentException e) {
throw new RuntimeException("Failed to initialize Kafka admin client", e);
}
}
@PreDestroy
private void destroy() throws Exception {
if (adminClient.isInitialized()) {
adminClient.get().close();
}
}
}

View File

@ -16,31 +16,12 @@
package org.thingsboard.server.queue.kafka;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.thingsboard.server.queue.TbEdgeQueueAdmin;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.util.PropertyUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* Created by ashvayka on 24.09.18.
@ -52,251 +33,42 @@ public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin {
private final Map<String, String> topicConfigs;
@Getter
private final int numPartitions;
private volatile Set<String> topics;
private final short replicationFactor;
public TbKafkaAdmin(TbKafkaSettings settings, Map<String, String> topicConfigs) {
this.settings = settings;
this.topicConfigs = topicConfigs;
String numPartitionsStr = topicConfigs.get(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
if (numPartitionsStr != null) {
numPartitions = Integer.parseInt(numPartitionsStr);
} else {
numPartitions = 1;
}
replicationFactor = settings.getReplicationFactor();
}
@Override
public void createTopicIfNotExists(String topic, String properties) {
Set<String> topics = getTopics();
if (topics.contains(topic)) {
return;
}
try {
Map<String, String> configs = PropertyUtils.getProps(topicConfigs, properties);
configs.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(configs);
createTopic(newTopic).values().get(topic).get();
topics.add(topic);
} catch (ExecutionException ee) {
if (ee.getCause() instanceof TopicExistsException) {
//do nothing
} else {
log.warn("[{}] Failed to create topic", topic, ee);
throw new RuntimeException(ee);
}
} catch (Exception e) {
log.warn("[{}] Failed to create topic", topic, e);
throw new RuntimeException(e);
}
public void createTopicIfNotExists(String topic, String properties, boolean force) {
settings.getAdmin().createTopicIfNotExists(topic, PropertyUtils.getProps(topicConfigs, properties), force);
}
@Override
public void deleteTopic(String topic) {
Set<String> topics = getTopics();
if (topics.remove(topic)) {
settings.getAdminClient().deleteTopics(Collections.singletonList(topic));
} else {
try {
if (settings.getAdminClient().listTopics().names().get().contains(topic)) {
settings.getAdminClient().deleteTopics(Collections.singletonList(topic));
} else {
log.warn("Kafka topic [{}] does not exist.", topic);
}
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to delete kafka topic [{}].", topic, e);
}
}
}
private Set<String> getTopics() {
if (topics == null) {
synchronized (this) {
if (topics == null) {
topics = ConcurrentHashMap.newKeySet();
try {
topics.addAll(settings.getAdminClient().listTopics().names().get());
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to get all topics.", e);
}
}
}
}
return topics;
}
public Set<String> getAllTopics() {
try {
return settings.getAdminClient().listTopics().names().get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to get all topics.", e);
}
return null;
}
public CreateTopicsResult createTopic(NewTopic topic) {
return settings.getAdminClient().createTopics(Collections.singletonList(topic));
settings.getAdmin().deleteTopic(topic);
}
@Override
public void destroy() {
}
/**
* Sync offsets from a fat group to a single-partition group
* Migration back from single-partition consumer to a fat group is not supported
* TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers
* */
public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) {
try {
log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId);
if (partitionId == null) {
return;
}
syncOffsetsUnsafe(fatGroupId, newGroupId, "." + partitionId);
} catch (Exception e) {
log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e);
}
}
/**
* Sync edge notifications offsets from a fat group to a single group per edge
* */
public void syncEdgeNotificationsOffsets(String fatGroupId, String newGroupId) {
try {
log.info("syncEdgeNotificationsOffsets [{}][{}]", fatGroupId, newGroupId);
syncOffsetsUnsafe(fatGroupId, newGroupId, newGroupId);
settings.getAdmin().syncOffsetsUnsafe(fatGroupId, newGroupId, newGroupId);
} catch (Exception e) {
log.warn("Failed to syncEdgeNotificationsOffsets from {} to {}", fatGroupId, newGroupId, e);
}
}
@Override
public void deleteConsumerGroup(String consumerGroupId) {
try {
settings.getAdminClient().deleteConsumerGroups(Collections.singletonList(consumerGroupId));
} catch (Exception e) {
log.warn("Failed to delete consumer group {}", consumerGroupId, e);
}
}
void syncOffsetsUnsafe(String fatGroupId, String newGroupId, String topicSuffix) throws ExecutionException, InterruptedException, TimeoutException {
Map<TopicPartition, OffsetAndMetadata> oldOffsets = getConsumerGroupOffsets(fatGroupId);
if (oldOffsets.isEmpty()) {
return;
}
for (var consumerOffset : oldOffsets.entrySet()) {
var tp = consumerOffset.getKey();
if (!tp.topic().endsWith(topicSuffix)) {
continue;
}
var om = consumerOffset.getValue();
Map<TopicPartition, OffsetAndMetadata> newOffsets = getConsumerGroupOffsets(newGroupId);
var existingOffset = newOffsets.get(tp);
if (existingOffset == null) {
log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets);
} else if (existingOffset.offset() >= om.offset()) {
log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset());
break;
} else {
log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset());
}
settings.getAdminClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(10, TimeUnit.SECONDS);
log.info("[{}] altered new consumer groupId {}", tp, newGroupId);
break;
}
}
@SneakyThrows
public Map<TopicPartition, OffsetAndMetadata> getConsumerGroupOffsets(String groupId) {
return settings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
}
public boolean isTopicEmpty(String topic) {
return areAllTopicsEmpty(Set.of(topic));
}
public boolean areAllTopicsEmpty(Set<String> topics) {
try {
List<String> existingTopics = getTopics().stream().filter(topics::contains).toList();
if (existingTopics.isEmpty()) {
return true;
}
List<TopicPartition> allPartitions = settings.getAdminClient().describeTopics(existingTopics).topicNameValues().entrySet().stream()
.flatMap(entry -> {
String topic = entry.getKey();
TopicDescription topicDescription;
try {
topicDescription = entry.getValue().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()));
})
.toList();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> beginningOffsets = settings.getAdminClient().listOffsets(allPartitions.stream()
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = settings.getAdminClient().listOffsets(allPartitions.stream()
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get();
for (TopicPartition partition : allPartitions) {
long beginningOffset = beginningOffsets.get(partition).offset();
long endOffset = endOffsets.get(partition).offset();
if (beginningOffset != endOffset) {
log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic());
return false;
}
}
return true;
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to check if topics [{}] empty.", topics, e);
return false;
}
}
public Map<String, Long> getTotalLagForGroupsBulk(Set<String> groupIds) {
Map<String, Long> result = new HashMap<>();
for (String groupId : groupIds) {
result.put(groupId, getTotalConsumerGroupLag(groupId));
}
return result;
}
public long getTotalConsumerGroupLag(String groupId) {
try {
Map<TopicPartition, OffsetAndMetadata> committedOffsets = getConsumerGroupOffsets(groupId);
if (committedOffsets.isEmpty()) {
return 0L;
}
Map<TopicPartition, OffsetSpec> latestOffsetsSpec = committedOffsets.keySet().stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
settings.getAdminClient().listOffsets(latestOffsetsSpec)
.all().get(10, TimeUnit.SECONDS);
return committedOffsets.entrySet().stream()
.mapToLong(entry -> {
TopicPartition tp = entry.getKey();
long committed = entry.getValue().offset();
long end = endOffsets.getOrDefault(tp,
new ListOffsetsResult.ListOffsetsResultInfo(0L, 0L, Optional.empty())).offset();
return end - committed;
}).sum();
} catch (Exception e) {
log.error("Failed to get total lag for consumer group: {}", groupId, e);
return 0L;
}
}
}

View File

@ -19,11 +19,11 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.queue.util.TbKafkaComponent;
@Component
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
@TbKafkaComponent
@Getter
@AllArgsConstructor
@NoArgsConstructor

View File

@ -26,10 +26,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.queue.util.TbKafkaComponent;
import java.time.Duration;
import java.util.ArrayList;
@ -44,11 +44,12 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
@TbKafkaComponent
public class TbKafkaConsumerStatsService {
private final Set<String> monitoredGroups = ConcurrentHashMap.newKeySet();
private final TbKafkaSettings kafkaSettings;
private final KafkaAdmin kafkaAdmin;
private final TbKafkaConsumerStatisticConfig statsConfig;
private Consumer<String, byte[]> consumer;
@ -77,7 +78,7 @@ public class TbKafkaConsumerStatsService {
}
for (String groupId : monitoredGroups) {
try {
Map<TopicPartition, OffsetAndMetadata> groupOffsets = kafkaSettings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata()
Map<TopicPartition, OffsetAndMetadata> groupOffsets = kafkaSettings.getAdmin().getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata()
.get(statsConfig.getKafkaResponseTimeoutMs(), TimeUnit.MILLISECONDS);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(groupOffsets.keySet(), timeoutDuration);
@ -159,12 +160,14 @@ public class TbKafkaConsumerStatsService {
@Override
public String toString() {
return "[" +
"topic=[" + topic + ']' +
", partition=[" + partition + "]" +
", committedOffset=[" + committedOffset + "]" +
", endOffset=[" + endOffset + "]" +
", lag=[" + lag + "]" +
"]";
"topic=[" + topic + ']' +
", partition=[" + partition + "]" +
", committedOffset=[" + committedOffset + "]" +
", endOffset=[" + endOffset + "]" +
", lag=[" + lag + "]" +
"]";
}
}
}

View File

@ -16,12 +16,10 @@
package org.thingsboard.server.queue.kafka;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -30,12 +28,13 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.TbProperty;
import org.thingsboard.server.queue.util.PropertyUtils;
import org.thingsboard.server.queue.util.TbKafkaComponent;
import java.util.HashMap;
import java.util.LinkedHashMap;
@ -47,7 +46,7 @@ import java.util.Properties;
* Created by ashvayka on 25.09.18.
*/
@Slf4j
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
@TbKafkaComponent
@ConfigurationProperties(prefix = "queue.kafka")
@Component
public class TbKafkaSettings {
@ -143,6 +142,9 @@ public class TbKafkaSettings {
@Value("${queue.kafka.consumer-properties-per-topic-inline:}")
private String consumerPropertiesPerTopicInline;
@Autowired
private KafkaAdmin kafkaAdmin;
@Deprecated
@Setter
private List<TbProperty> other;
@ -150,8 +152,6 @@ public class TbKafkaSettings {
@Setter
private Map<String, List<TbProperty>> consumerPropertiesPerTopic = new HashMap<>();
private volatile AdminClient adminClient;
@PostConstruct
public void initInlineTopicProperties() {
Map<String, List<TbProperty>> inlineProps = parseTopicPropertyList(consumerPropertiesPerTopicInline);
@ -240,15 +240,12 @@ public class TbKafkaSettings {
}
}
public AdminClient getAdminClient() {
if (adminClient == null) {
synchronized (this) {
if (adminClient == null) {
adminClient = AdminClient.create(toAdminProps());
}
}
}
return adminClient;
/*
* Temporary solution to avoid major code changes.
* FIXME: use single instance of Kafka queue admin, don't create a separate one for each consumer/producer
* */
public KafkaAdmin getAdmin() {
return kafkaAdmin;
}
protected Properties toAdminProps() {
@ -279,11 +276,4 @@ public class TbKafkaSettings {
return result;
}
@PreDestroy
private void destroy() {
if (adminClient != null) {
adminClient.close();
}
}
}

View File

@ -18,14 +18,14 @@ package org.thingsboard.server.queue.kafka;
import jakarta.annotation.PostConstruct;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.queue.util.PropertyUtils;
import org.thingsboard.server.queue.util.TbKafkaComponent;
import java.util.Map;
@Component
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
@TbKafkaComponent
public class TbKafkaTopicConfigs {
public static final String NUM_PARTITIONS_SETTING = "partitions";

View File

@ -78,7 +78,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory
templateBuilder.queueAdmin(new TbQueueAdmin() {
@Override
public void createTopicIfNotExists(String topic, String properties) {}
public void createTopicIfNotExists(String topic, String properties, boolean force) {}
@Override
public void destroy() {}

View File

@ -58,6 +58,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
@ -83,6 +84,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TopicService topicService;
private final TbKafkaSettings kafkaSettings;
private final KafkaAdmin kafkaAdmin;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueCoreSettings coreSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
@ -118,7 +120,9 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final AtomicLong consumerCount = new AtomicLong();
private final AtomicLong edgeConsumerCount = new AtomicLong();
public KafkaMonolithQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings,
public KafkaMonolithQueueFactory(TopicService topicService,
TbKafkaSettings kafkaSettings,
KafkaAdmin kafkaAdmin,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
@ -134,6 +138,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
TasksQueueConfig tasksQueueConfig) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
this.kafkaAdmin = kafkaAdmin;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
@ -240,7 +245,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
String queueName = configuration.getName();
String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId);
ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
kafkaAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
groupId, partitionId);
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();

View File

@ -52,6 +52,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
@ -74,6 +75,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TopicService topicService;
private final TbKafkaSettings kafkaSettings;
private final KafkaAdmin kafkaAdmin;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueCoreSettings coreSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
@ -99,6 +101,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final AtomicLong consumerCount = new AtomicLong();
public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings,
KafkaAdmin kafkaAdmin,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
@ -111,6 +114,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
this.kafkaAdmin = kafkaAdmin;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
@ -234,7 +238,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
String queueName = configuration.getName();
String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId);
ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
kafkaAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
groupId, partitionId);
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();

View File

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.util;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({java.lang.annotation.ElementType.TYPE, java.lang.annotation.ElementType.METHOD})
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
public @interface TbKafkaComponent {}

View File

@ -28,7 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
@SpringBootTest(classes = TbKafkaSettings.class)
@SpringBootTest(classes = {TbKafkaSettings.class, KafkaAdmin.class})
@TestPropertySource(properties = {
"queue.type=kafka",
"queue.kafka.bootstrap.servers=localhost:9092",

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class CachedValue<V> {
private final LoadingCache<Object, V> cache;
public CachedValue(Supplier<V> supplier, long valueTtlMs) {
this.cache = Caffeine.newBuilder()
.expireAfterWrite(valueTtlMs, TimeUnit.MILLISECONDS)
.build(__ -> supplier.get());
}
public V get() {
return cache.get(this);
}
}