Merge pull request #13023 from thingsboard/fix/edqs

Refactoring and fixes for EDQS
This commit is contained in:
Viacheslav Klimov 2025-03-28 14:45:53 +02:00 committed by GitHub
commit c2c99d05db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 300 additions and 226 deletions

View File

@ -97,7 +97,10 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
.scheduler(eventConsumer.getScheduler())
.taskExecutor(eventConsumer.getTaskExecutor())
.build();
super.stateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer);
super.stateService = KafkaQueueStateService.<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>>builder()
.eventConsumer(eventConsumer)
.stateConsumer(stateConsumer)
.build();
this.stateProducer = (TbKafkaProducerTemplate<TbProtoQueueMsg<CalculatedFieldStateProto>>) queueFactory.createCalculatedFieldStateProducer();
}

View File

@ -59,7 +59,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.discovery.HashPartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.environment.DistributedLock;
import org.thingsboard.server.queue.environment.DistributedLockService;
import org.thingsboard.server.queue.provider.EdqsClientQueueFactory;
@ -96,7 +95,7 @@ public class DefaultEdqsService implements EdqsService {
private void init() {
executor = ThingsBoardExecutors.newWorkStealingPool(12, getClass());
eventsProducer = EdqsProducer.builder()
.producer(queueFactory.createEdqsMsgProducer(EdqsQueue.EVENTS))
.producer(queueFactory.createEdqsEventsProducer())
.partitionService(edqsPartitionService)
.build();
syncLock = distributedLockService.getLock("edqs_sync");
@ -148,9 +147,12 @@ public class DefaultEdqsService implements EdqsService {
syncLock.lock();
try {
EdqsSyncState syncState = getSyncState();
if (syncState != null && syncState.getStatus() == EdqsSyncStatus.FINISHED) {
log.info("EDQS sync is already finished");
return;
if (syncState != null) {
EdqsSyncStatus status = syncState.getStatus();
if (status == EdqsSyncStatus.FINISHED || status == EdqsSyncStatus.FAILED) {
log.info("EDQS sync is already " + status + ", ignoring the msg");
return;
}
}
saveSyncState(EdqsSyncStatus.STARTED);

View File

@ -19,7 +19,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
@ -37,7 +36,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService {
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap());
this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions())
.mapToObj(partition -> TopicPartitionInfo.builder()
.topic(EdqsQueue.EVENTS.getTopic())
.topic(edqsConfig.getEventsTopic())
.partition(partition)
.build().getFullTopicName())
.collect(Collectors.toSet()));

View File

@ -54,7 +54,7 @@ import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerPartitionedService;
import org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
@ -71,7 +71,7 @@ import java.util.stream.Collectors;
@Service
@TbRuleEngineComponent
@Slf4j
public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPartitionedService<ToCalculatedFieldNotificationMsg> implements TbCalculatedFieldConsumerService {
public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBasedConsumerService<ToCalculatedFieldNotificationMsg> implements TbCalculatedFieldConsumerService {
@Value("${queue.calculated_fields.poll_interval:25}")
private long pollInterval;
@ -99,7 +99,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
}
@Override
protected void doAfterStartUp() {
protected void onStartUp() {
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create()
.queueKey(queueKey)
@ -126,7 +126,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
}
@Override
protected void processPartitionChangeEvent(PartitionChangeEvent event) {
protected void onPartitionChangeEvent(PartitionChangeEvent event) {
try {
event.getNewPartitions().forEach((queueKey, partitions) -> {
if (queueKey.getQueueName().equals(DataConstants.CF_QUEUE_NAME)) {
@ -143,11 +143,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
}
}
@Override
protected String getPrefix() {
return "tb-cf";
}
private void processMsgs(List<TbProtoQueueMsg<ToCalculatedFieldMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumer, QueueConfig config) throws Exception {
List<IdMsgPair<ToCalculatedFieldMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList();
ConcurrentMap<UUID, TbProtoQueueMsg<ToCalculatedFieldMsg>> pendingMap = orderedMsgList.stream().collect(
@ -195,6 +190,11 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
return ServiceType.TB_RULE_ENGINE;
}
@Override
protected String getPrefix() {
return "tb-cf";
}
@Override
protected long getNotificationPollDuration() {
return pollInterval;

View File

@ -49,7 +49,7 @@ import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerPartitionedService;
import org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService;
import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineConsumerContext;
import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineQueueConsumerManager;
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
@ -66,7 +66,7 @@ import java.util.stream.Collectors;
@Service
@TbRuleEngineComponent
@Slf4j
public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitionedService<ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService {
public class DefaultTbRuleEngineConsumerService extends AbstractPartitionBasedConsumerService<ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService {
private final TbRuleEngineConsumerContext ctx;
private final QueueService queueService;
@ -93,7 +93,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitio
}
@Override
protected void doAfterStartUp() {
protected void onStartUp() {
List<Queue> queues = queueService.findAllQueues();
for (Queue configuration : queues) {
if (partitionService.isManagedByCurrentService(configuration.getTenantId())) {
@ -104,7 +104,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitio
}
@Override
protected void processPartitionChangeEvent(PartitionChangeEvent event) {
protected void onPartitionChangeEvent(PartitionChangeEvent event) {
event.getNewPartitions().forEach((queueKey, partitions) -> {
if (DataConstants.CF_QUEUE_NAME.equals(queueKey.getQueueName()) || DataConstants.CF_STATES_QUEUE_NAME.equals(queueKey.getQueueName())) {
return;
@ -136,11 +136,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitio
});
}
@Override
protected String getPrefix() {
return "tb-rule-engine";
}
@Override
protected void stopConsumers() {
super.stopConsumers();
@ -153,6 +148,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitio
return ServiceType.TB_RULE_ENGINE;
}
@Override
protected String getPrefix() {
return "tb-rule-engine";
}
@Override
protected long getNotificationPollDuration() {
return ctx.getPollDuration();

View File

@ -31,24 +31,22 @@ import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsServ
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public abstract class AbstractConsumerPartitionedService<N extends com.google.protobuf.GeneratedMessageV3> extends AbstractConsumerService<N> {
public abstract class AbstractPartitionBasedConsumerService<N extends com.google.protobuf.GeneratedMessageV3> extends AbstractConsumerService<N> {
private final Lock startupLock;
private volatile boolean consumersInitialized;
private final Lock startupLock = new ReentrantLock();
private volatile boolean started = false;
private PartitionChangeEvent lastPartitionChangeEvent;
public AbstractConsumerPartitionedService(ActorSystemContext actorContext,
TbTenantProfileCache tenantProfileCache,
TbDeviceProfileCache deviceProfileCache,
TbAssetProfileCache assetProfileCache,
CalculatedFieldCache calculatedFieldCache,
TbApiUsageStateService apiUsageStateService,
PartitionService partitionService,
ApplicationEventPublisher eventPublisher,
JwtSettingsService jwtSettingsService) {
public AbstractPartitionBasedConsumerService(ActorSystemContext actorContext,
TbTenantProfileCache tenantProfileCache,
TbDeviceProfileCache deviceProfileCache,
TbAssetProfileCache assetProfileCache,
CalculatedFieldCache calculatedFieldCache,
TbApiUsageStateService apiUsageStateService,
PartitionService partitionService,
ApplicationEventPublisher eventPublisher,
JwtSettingsService jwtSettingsService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService);
this.startupLock = new ReentrantLock();
this.consumersInitialized = false;
}
@PostConstruct
@ -57,13 +55,14 @@ public abstract class AbstractConsumerPartitionedService<N extends com.google.pr
}
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
@Override
public void afterStartUp() {
super.afterStartUp();
doAfterStartUp();
onStartUp();
startupLock.lock();
try {
processPartitionChangeEvent(lastPartitionChangeEvent);
consumersInitialized = true;
onPartitionChangeEvent(lastPartitionChangeEvent);
started = true;
} finally {
startupLock.unlock();
}
@ -71,10 +70,10 @@ public abstract class AbstractConsumerPartitionedService<N extends com.google.pr
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
if (!consumersInitialized) {
if (!started) {
startupLock.lock();
try {
if (!consumersInitialized) {
if (!started) {
lastPartitionChangeEvent = event;
return;
}
@ -82,12 +81,12 @@ public abstract class AbstractConsumerPartitionedService<N extends com.google.pr
startupLock.unlock();
}
}
processPartitionChangeEvent(event);
onPartitionChangeEvent(event);
}
protected abstract void doAfterStartUp();
protected abstract void onStartUp();
protected abstract void processPartitionChangeEvent(PartitionChangeEvent event);
protected abstract void onPartitionChangeEvent(PartitionChangeEvent event);
protected abstract String getPrefix();

View File

@ -1646,7 +1646,7 @@ queue:
# Kafka properties for Calculated Field State topics
calculated-field-state: "${TB_QUEUE_KAFKA_CF_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:104857600000;partitions:1;min.insync.replicas:1;cleanup.policy:compact}"
# Kafka properties for EDQS events topics
edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}"
edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:86400000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}"
# Kafka properties for EDQS requests topic (default: 3 minutes retention)
edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
# Kafka properties for EDQS state topic (infinite retention, compaction)
@ -1748,6 +1748,10 @@ queue:
partitions: "${TB_EDQS_PARTITIONS:12}"
# EDQS partitioning strategy: tenant (partition is resolved by tenant id) or none (no specific strategy, resolving by message key)
partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}"
# EDQS events topic
events_topic: "${TB_EDQS_EVENTS_TOPIC:edqs.events}"
# EDQS state topic
state_topic: "${TB_EDQS_STATE_TOPIC:edqs.state}"
# EDQS requests topic
requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}"
# EDQS responses topic

View File

@ -17,8 +17,6 @@ package org.thingsboard.server.controller;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import org.awaitility.Awaitility;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -73,7 +71,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;

View File

@ -50,10 +50,10 @@ public class CustomerData extends BaseEntityData<CustomerFields> {
entitiesById.computeIfAbsent(ed.getEntityType(), et -> new ConcurrentHashMap<>()).put(ed.getId(), ed);
}
public boolean remove(EntityData<?> ed) {
var map = entitiesById.get(ed.getEntityType());
public boolean remove(EntityType entityType, UUID entityId) {
var map = entitiesById.get(entityType);
if (map != null) {
return map.remove(ed.getId()) != null;
return map.remove(entityId) != null;
} else {
return false;
}

View File

@ -62,7 +62,6 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.edqs.EdqsComponent;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsConfig.EdqsPartitioningStrategy;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.edqs.EdqsQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
@ -123,8 +122,8 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
};
eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
.queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.EVENTS.getTopic()))
.topic(EdqsQueue.EVENTS.getTopic())
.queueKey(new QueueKey(ServiceType.EDQS, config.getEventsTopic()))
.topic(config.getEventsTopic())
.pollInterval(config.getPollInterval())
.msgPackProcessor((msgs, consumer, config) -> {
for (TbProtoQueueMsg<ToEdqsMsg> queueMsg : msgs) {
@ -133,14 +132,14 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
}
try {
ToEdqsMsg msg = queueMsg.getValue();
process(msg, EdqsQueue.EVENTS);
process(msg, true);
} catch (Exception t) {
log.error("Failed to process message: {}", queueMsg, t);
}
}
consumer.commit();
})
.consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS))
.consumerCreator((config, partitionId) -> queueFactory.createEdqsEventsConsumer())
.queueAdmin(queueFactory.getEdqsQueueAdmin())
.consumerExecutor(consumersExecutor)
.taskExecutor(taskExecutor)
@ -165,7 +164,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
try {
Set<TopicPartitionInfo> newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS));
stateService.process(withTopic(newPartitions, EdqsQueue.STATE.getTopic()));
stateService.process(withTopic(newPartitions, config.getStateTopic()));
// eventsConsumer's partitions are updated by stateService
responseTemplate.subscribe(withTopic(newPartitions, config.getRequestsTopic())); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template
@ -235,7 +234,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
return response;
}
public void process(ToEdqsMsg edqsMsg, EdqsQueue queue) {
public void process(ToEdqsMsg edqsMsg, boolean backup) {
log.trace("Processing message: {}", edqsMsg);
if (edqsMsg.hasEventMsg()) {
EdqsEventMsg eventMsg = edqsMsg.getEventMsg();
@ -252,7 +251,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
} else if (!ObjectType.unversionedTypes.contains(objectType)) {
log.warn("[{}] {} {} doesn't have version", tenantId, objectType, key);
}
if (queue != EdqsQueue.STATE) {
if (backup) {
stateService.save(tenantId, objectType, key, eventType, edqsMsg);
}

View File

@ -150,8 +150,9 @@ public class TenantRepo {
}
} else if (RelationTypeGroup.DASHBOARD.equals(entity.getTypeGroup())) {
if (EntityRelation.CONTAINS_TYPE.equals(entity.getType()) && entity.getFrom().getEntityType() == EntityType.CUSTOMER) {
((CustomerData) getEntityMap(EntityType.CUSTOMER).computeIfAbsent(entity.getFrom().getId(), CustomerData::new))
.addOrUpdate(getEntityMap(EntityType.DASHBOARD).get(entity.getTo().getId()));
CustomerData customerData = (CustomerData) getOrCreate(entity.getFrom());
EntityData<?> dashboardData = getOrCreate(entity.getTo());
customerData.addOrUpdate(dashboardData);
}
}
} finally {
@ -170,8 +171,10 @@ public class TenantRepo {
}
} else if (RelationTypeGroup.DASHBOARD.equals(entityRelation.getTypeGroup())) {
if (EntityRelation.CONTAINS_TYPE.equals(entityRelation.getType()) && entityRelation.getFrom().getEntityType() == EntityType.CUSTOMER) {
((CustomerData) getEntityMap(EntityType.CUSTOMER).computeIfAbsent(entityRelation.getFrom().getId(), CustomerData::new))
.remove(getEntityMap(EntityType.DASHBOARD).get(entityRelation.getTo().getId()));
CustomerData customerData = (CustomerData) get(entityRelation.getFrom());
if (customerData != null) {
customerData.remove(EntityType.DASHBOARD, entityRelation.getTo().getId());
}
}
}
}
@ -197,13 +200,13 @@ public class TenantRepo {
entityData.setCustomerId(newCustomerId);
if (entityIdMismatch(oldCustomerId, newCustomerId)) {
if (oldCustomerId != null) {
CustomerData old = (CustomerData) getEntityMap(EntityType.CUSTOMER).get(oldCustomerId);
CustomerData old = (CustomerData) get(EntityType.CUSTOMER, oldCustomerId);
if (old != null) {
old.remove(entityData);
old.remove(entityType, entityId);
}
}
if (newCustomerId != null) {
CustomerData newData = (CustomerData) getEntityMap(EntityType.CUSTOMER).computeIfAbsent(newCustomerId, CustomerData::new);
CustomerData newData = (CustomerData) getOrCreate(EntityType.CUSTOMER, newCustomerId);
newData.addOrUpdate(entityData);
}
}
@ -223,11 +226,12 @@ public class TenantRepo {
getEntitySet(entityType).remove(removed);
}
edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.fromEntityType(entityType), EdqsEventType.DELETED));
UUID customerId = removed.getCustomerId();
if (customerId != null) {
CustomerData customerData = (CustomerData) getEntityMap(EntityType.CUSTOMER).get(customerId);
CustomerData customerData = (CustomerData) get(EntityType.CUSTOMER, customerId);
if (customerData != null) {
customerData.remove(removed);
customerData.remove(entityType, entityId);
}
}
}
@ -303,7 +307,11 @@ public class TenantRepo {
}
private EntityData<?> get(EntityId entityId) {
return getEntityMap(entityId.getEntityType()).get(entityId.getId());
return get(entityId.getEntityType(), entityId.getId());
}
private EntityData<?> get(EntityType entityType, UUID entityId) {
return getEntityMap(entityType).get(entityId);
}
private EntityData<?> constructEntityData(EntityType entityType, UUID id) {
@ -425,7 +433,7 @@ public class TenantRepo {
EntityType entityType = entityId.getEntityType();
return switch (entityType) {
case CUSTOMER, TENANT -> {
EntityFields fields = getEntityMap(entityType).get(entityId.getId()).getFields();
EntityFields fields = get(entityId).getFields();
yield fields != null ? fields.getName() : "";
}
default -> throw new RuntimeException("Unsupported entity type: " + entityType);

View File

@ -36,12 +36,14 @@ import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
import org.thingsboard.server.queue.common.state.QueueStateService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.edqs.EdqsQueueFactory;
import org.thingsboard.server.queue.edqs.KafkaEdqsComponent;
import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@ -56,8 +58,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
private final EdqsConfig config;
private final EdqsPartitionService partitionService;
private final EdqsQueueFactory queueFactory;
private final TopicService topicService;
private final KafkaEdqsQueueFactory queueFactory;
@Autowired @Lazy
private EdqsProcessor edqsProcessor;
@ -73,15 +74,16 @@ public class KafkaEdqsStateService implements EdqsStateService {
@Override
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer) {
TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin();
stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
.queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.STATE.getTopic()))
.topic(EdqsQueue.STATE.getTopic())
.queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic()))
.topic(config.getStateTopic())
.pollInterval(config.getPollInterval())
.msgPackProcessor((msgs, consumer, config) -> {
for (TbProtoQueueMsg<ToEdqsMsg> queueMsg : msgs) {
try {
ToEdqsMsg msg = queueMsg.getValue();
edqsProcessor.process(msg, EdqsQueue.STATE);
edqsProcessor.process(msg, false);
if (stateReadCount.incrementAndGet() % 100000 == 0) {
log.info("[state] Processed {} msgs", stateReadCount.get());
}
@ -91,15 +93,15 @@ public class KafkaEdqsStateService implements EdqsStateService {
}
consumer.commit();
})
.consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.STATE))
.queueAdmin(queueFactory.getEdqsQueueAdmin())
.consumerCreator((config, partitionId) -> queueFactory.createEdqsStateConsumer())
.queueAdmin(queueAdmin)
.consumerExecutor(eventConsumer.getConsumerExecutor())
.taskExecutor(eventConsumer.getTaskExecutor())
.scheduler(eventConsumer.getScheduler())
.uncaughtErrorHandler(edqsProcessor.getErrorHandler())
.build();
queueStateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer);
TbKafkaConsumerTemplate<TbProtoQueueMsg<ToEdqsMsg>> eventsToBackupKafkaConsumer = queueFactory.createEdqsEventsToBackupConsumer();
eventsToBackupConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.name("edqs-events-to-backup-consumer")
.pollInterval(config.getPollInterval())
@ -137,15 +139,35 @@ public class KafkaEdqsStateService implements EdqsStateService {
}
consumer.commit();
})
.consumerCreator(() -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS, "events-to-backup-consumer-group")) // shared by all instances consumer group
.consumerCreator(() -> eventsToBackupKafkaConsumer)
.consumerExecutor(eventConsumer.getConsumerExecutor())
.threadPrefix("edqs-events-to-backup")
.build();
stateProducer = EdqsProducer.builder()
.producer(queueFactory.createEdqsMsgProducer(EdqsQueue.STATE))
.producer(queueFactory.createEdqsStateProducer())
.partitionService(partitionService)
.build();
queueStateService = KafkaQueueStateService.<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<ToEdqsMsg>>builder()
.eventConsumer(eventConsumer)
.stateConsumer(stateConsumer)
.eventsStartOffsetsProvider(() -> {
// taking start offsets for events topics from the events-to-backup consumer group,
// since eventConsumer doesn't use consumer group management and thus offset tracking
// (because we need to be able to consume the same topic-partition by multiple instances)
Map<String, Long> offsets = new HashMap<>();
try {
queueAdmin.getConsumerGroupOffsets(eventsToBackupKafkaConsumer.getGroupId())
.forEach((topicPartition, offsetAndMetadata) -> {
offsets.put(topicPartition.topic(), offsetAndMetadata.offset());
});
} catch (Exception e) {
log.error("Failed to get consumer group offsets for {}", eventsToBackupKafkaConsumer.getGroupId(), e);
}
return offsets;
})
.build();
}
@Override
@ -153,7 +175,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
if (queueStateService.getPartitions().isEmpty()) {
Set<TopicPartitionInfo> allPartitions = IntStream.range(0, config.getPartitions())
.mapToObj(partition -> TopicPartitionInfo.builder()
.topic(EdqsQueue.EVENTS.getTopic())
.topic(config.getEventsTopic())
.partition(partition)
.build())
.collect(Collectors.toSet());

View File

@ -29,7 +29,6 @@ import org.thingsboard.server.edqs.util.EdqsRocksDb;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent;
import java.util.Set;
@ -61,14 +60,14 @@ public class LocalEdqsStateService implements EdqsStateService {
try {
ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value);
log.trace("[{}] Restored msg from RocksDB: {}", key, edqsMsg);
processor.process(edqsMsg, EdqsQueue.STATE);
processor.process(edqsMsg, false);
} catch (Exception e) {
log.error("[{}] Failed to restore value", key, e);
}
});
log.info("Restore completed");
}
eventConsumer.update(withTopic(partitions, EdqsQueue.EVENTS.getTopic()));
eventConsumer.update(withTopic(partitions, eventConsumer.getTopic()));
this.partitions = partitions;
}

View File

@ -90,9 +90,7 @@ public class TopicPartitionInfo {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopicPartitionInfo that = (TopicPartitionInfo) o;
return topic.equals(that.topic) &&
Objects.equals(tenantId, that.tenantId) &&
Objects.equals(partition, that.partition) &&
return Objects.equals(partition, that.partition) &&
fullTopicName.equals(that.fullTopicName);
}

View File

@ -18,12 +18,16 @@ package org.thingsboard.server.common.msg.queue;
import org.junit.jupiter.api.Test;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.UUID;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
public class TopicPartitionInfoTest {
private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
@Test
public void givenTopicPartitionInfo_whenEquals_thenTrue() {
@ -52,13 +56,13 @@ public class TopicPartitionInfoTest {
assertThat(TopicPartitionInfo.builder()
.topic("tb_core")
.tenantId(TenantId.SYS_TENANT_ID)
.tenantId(tenantId)
.partition(4)
.myPartition(true) //will ignored on equals
.build()
, is(TopicPartitionInfo.builder()
.topic("tb_core")
.tenantId(TenantId.SYS_TENANT_ID)
.tenantId(tenantId)
.partition(4)
.myPartition(true) //will ignored on equals
.build()));
@ -109,7 +113,7 @@ public class TopicPartitionInfoTest {
assertThat(TopicPartitionInfo.builder()
.topic("tb_core")
.tenantId(TenantId.SYS_TENANT_ID)
.tenantId(tenantId)
.partition(4)
.myPartition(true) //will ignored on equals
.build()
@ -117,7 +121,7 @@ public class TopicPartitionInfoTest {
assertThat(TopicPartitionInfo.builder()
.topic("tb_core")
.tenantId(TenantId.SYS_TENANT_ID)
.tenantId(tenantId)
.partition(4)
.myPartition(false) //will ignored on equals
.build()

View File

@ -26,6 +26,7 @@ import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdateConfigTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdatePartitionsTask;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import java.util.Collection;
import java.util.Collections;
@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@Slf4j
public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfig> {
@ -296,7 +298,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
log.info("[{}] Added partitions: {}, removed partitions: {}", queueKey, addedPartitions, removedPartitions);
removePartitions(removedPartitions);
addPartitions(addedPartitions, null);
addPartitions(addedPartitions, null, null);
}
protected void removePartitions(Set<TopicPartitionInfo> removedPartitions) {
@ -304,13 +306,19 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
removedPartitions.forEach((tpi) -> Optional.ofNullable(consumers.remove(tpi)).ifPresent(TbQueueConsumerTask::awaitCompletion));
}
protected void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop) {
protected void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop, Function<String, Long> startOffsetProvider) {
partitions.forEach(tpi -> {
Integer partitionId = tpi.getPartition().orElse(-1);
String key = queueKey + "-" + partitionId;
Runnable callback = onStop != null ? () -> onStop.accept(tpi) : null;
TbQueueConsumerTask<M> consumer = new TbQueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId), callback);
TbQueueConsumerTask<M> consumer = new TbQueueConsumerTask<>(key, () -> {
TbQueueConsumer<M> queueConsumer = consumerCreator.apply(config, partitionId);
if (startOffsetProvider != null && queueConsumer instanceof TbKafkaConsumerTemplate<M> kafkaConsumer) {
kafkaConsumer.setStartOffsetProvider(startOffsetProvider);
}
return queueConsumer;
}, callback);
consumers.put(tpi, consumer);
consumer.subscribe(Set.of(tpi));
launchConsumer(consumer);

View File

@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@Slf4j
public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQueueConsumerManager<M, QueueConfig> {
@ -57,7 +58,7 @@ public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQ
protected void processTask(TbQueueConsumerManagerTask task) {
if (task instanceof AddPartitionsTask addPartitionsTask) {
log.info("[{}] Added partitions: {}", queueKey, addPartitionsTask.partitions());
consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop());
consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop(), addPartitionsTask.startOffsetProvider());
} else if (task instanceof RemovePartitionsTask removePartitionsTask) {
log.info("[{}] Removed partitions: {}", queueKey, removePartitionsTask.partitions());
consumerWrapper.removePartitions(removePartitionsTask.partitions());
@ -76,11 +77,11 @@ public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQ
}
public void addPartitions(Set<TopicPartitionInfo> partitions) {
addPartitions(partitions, null);
addPartitions(partitions, null, null);
}
public void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop) {
addTask(new AddPartitionsTask(partitions, onStop));
public void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop, Function<String, Long> startOffsetProvider) {
addTask(new AddPartitionsTask(partitions, onStop, startOffsetProvider));
}
public void removePartitions(Set<TopicPartitionInfo> partitions) {

View File

@ -20,6 +20,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
public interface TbQueueConsumerManagerTask {
@ -46,7 +47,9 @@ public interface TbQueueConsumerManagerTask {
}
}
record AddPartitionsTask(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop) implements TbQueueConsumerManagerTask {
record AddPartitionsTask(Set<TopicPartitionInfo> partitions,
Consumer<TopicPartitionInfo> onStop,
Function<String, Long> startOffsetProvider) implements TbQueueConsumerManagerTask {
@Override
public QueueTaskType getType() {
return QueueTaskType.ADD_PARTITIONS;

View File

@ -15,13 +15,16 @@
*/
package org.thingsboard.server.queue.common.state;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic;
@ -29,14 +32,21 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop
public class KafkaQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> extends QueueStateService<E, S> {
private final PartitionedQueueConsumerManager<S> stateConsumer;
private final Supplier<Map<String, Long>> eventsStartOffsetsProvider;
public KafkaQueueStateService(PartitionedQueueConsumerManager<E> eventConsumer, PartitionedQueueConsumerManager<S> stateConsumer) {
@Builder
public KafkaQueueStateService(PartitionedQueueConsumerManager<E> eventConsumer,
PartitionedQueueConsumerManager<S> stateConsumer,
Supplier<Map<String, Long>> eventsStartOffsetsProvider) {
super(eventConsumer);
this.stateConsumer = stateConsumer;
this.eventsStartOffsetsProvider = eventsStartOffsetsProvider;
}
@Override
protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
Map<String, Long> eventsStartOffsets = eventsStartOffsetsProvider != null ? eventsStartOffsetsProvider.get() : null; // remembering the offsets before subscribing to states
Set<TopicPartitionInfo> statePartitions = withTopic(partitions, stateConsumer.getTopic());
partitionsInProgress.addAll(statePartitions);
stateConsumer.addPartitions(statePartitions, statePartition -> {
@ -51,12 +61,12 @@ public class KafkaQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg>
TopicPartitionInfo eventPartition = statePartition.withTopic(eventConsumer.getTopic());
if (this.partitions.get(queueKey).contains(eventPartition)) {
eventConsumer.addPartitions(Set.of(eventPartition));
eventConsumer.addPartitions(Set.of(eventPartition), null, eventsStartOffsets != null ? eventsStartOffsets::get : null);
}
} finally {
readLock.unlock();
}
});
}, null);
}
@Override

View File

@ -103,6 +103,9 @@ public class TopicService {
}
public String buildTopicName(String topic) {
if (topic == null) {
return null;
}
return prefix.isBlank() ? topic : prefix + "." + topic;
}
@ -113,9 +116,9 @@ public class TopicService {
public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) {
return this.buildTopicName(
servicePrefix + queueName
+ (tenantId.isSysTenantId() ? "" : ("-isolated-" + tenantId))
+ "-consumer"
+ suffix(partitionId));
+ (tenantId.isSysTenantId() ? "" : ("-isolated-" + tenantId))
+ "-consumer"
+ suffix(partitionId));
}
String suffix(Integer partitionId) {

View File

@ -315,7 +315,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
ScheduledFuture<?> task = delayedTasks.remove(serviceId);
if (task != null) {
if (task.cancel(false)) {
log.debug("[{}] Recalculate partitions ignored. Service was restarted in time [{}].",
log.info("[{}] Recalculate partitions ignored. Service was restarted in time [{}].",
serviceId, serviceTypesList);
} else {
log.debug("[{}] Going to recalculate partitions. Service was not restarted in time [{}]!",

View File

@ -30,6 +30,10 @@ public class EdqsConfig {
@Value("#{'${queue.edqs.partitioning_strategy:tenant}'.toUpperCase()}")
private EdqsPartitioningStrategy partitioningStrategy;
@Value("${queue.edqs.events_topic:edqs.events}")
private String eventsTopic;
@Value("${queue.edqs.state_topic:edqs.state}")
private String stateTopic;
@Value("${queue.edqs.requests_topic:edqs.requests}")
private String requestsTopic;
@Value("${queue.edqs.responses_topic:edqs.responses}")

View File

@ -1,36 +0,0 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.edqs;
import lombok.Getter;
@Getter
public enum EdqsQueue {
EVENTS("edqs.events", false, false),
STATE("edqs.state", true, true);
private final String topic;
private final boolean readFromBeginning;
private final boolean stopWhenRead;
EdqsQueue(String topic, boolean readFromBeginning, boolean stopWhenRead) {
this.topic = topic;
this.readFromBeginning = readFromBeginning;
this.stopWhenRead = stopWhenRead;
}
}

View File

@ -25,11 +25,13 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
public interface EdqsQueueFactory {
TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue);
TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsConsumer();
TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue, String group);
TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsToBackupConsumer();
TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue);
TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateConsumer();
TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateProducer();
TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate();

View File

@ -43,24 +43,23 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory {
private final TbQueueAdmin queueAdmin;
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue) {
if (queue == EdqsQueue.STATE) {
throw new UnsupportedOperationException();
}
return new InMemoryTbQueueConsumer<>(storage, queue.getTopic());
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsConsumer() {
return new InMemoryTbQueueConsumer<>(storage, edqsConfig.getEventsTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue, String group) {
return createEdqsMsgConsumer(queue);
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsToBackupConsumer() {
throw new UnsupportedOperationException();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
if (queue == EdqsQueue.STATE) {
throw new UnsupportedOperationException();
}
return new InMemoryTbQueueProducer<>(storage, queue.getTopic());
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateConsumer() {
throw new UnsupportedOperationException();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateProducer() {
throw new UnsupportedOperationException();
}
@Override

View File

@ -19,11 +19,8 @@ import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueResponseTemplate;
@ -71,55 +68,68 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue) {
String consumerGroup = "edqs-" + queue.name().toLowerCase() + "-consumer-group-" + serviceInfoProvider.getServiceId();
return createEdqsMsgConsumer(queue, consumerGroup);
public TbKafkaConsumerTemplate<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsConsumer() {
return createEdqsMsgConsumer(edqsConfig.getEventsTopic(),
"edqs-events-" + consumerCounter.getAndIncrement() + "-consumer-" + serviceInfoProvider.getServiceId(),
null, // not using consumer group management, offsets from the edqs-events-to-backup-consumer-group are used (see KafkaEdqsStateService)
false, edqsEventsAdmin);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue, String group) {
public TbKafkaConsumerTemplate<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsToBackupConsumer() {
return createEdqsMsgConsumer(edqsConfig.getEventsTopic(),
"edqs-events-to-backup-consumer-" + serviceInfoProvider.getServiceId(),
"edqs-events-to-backup-consumer-group",
false, edqsEventsAdmin);
}
@Override
public TbKafkaConsumerTemplate<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateConsumer() {
return createEdqsMsgConsumer(edqsConfig.getStateTopic(),
"edqs-state-" + consumerCounter.getAndIncrement() + "-consumer-" + serviceInfoProvider.getServiceId(),
null, // not using consumer group management
true, edqsStateAdmin);
}
public TbKafkaConsumerTemplate<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(String topic, String clientId, String group, boolean readFullAndStop, TbKafkaAdmin admin) {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.settings(kafkaSettings)
.topic(topicService.buildTopicName(queue.getTopic()))
.readFromBeginning(queue.isReadFromBeginning())
.stopWhenRead(queue.isStopWhenRead())
.clientId("edqs-" + queue.name().toLowerCase() + "-" + consumerCounter.getAndIncrement() + "-consumer-" + serviceInfoProvider.getServiceId())
.topic(topicService.buildTopicName(topic))
.readFromBeginning(readFullAndStop)
.stopWhenRead(readFullAndStop)
.clientId(clientId)
.groupId(topicService.buildTopicName(group))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdqsMsg.parseFrom(msg.getData()), msg.getHeaders()))
.admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin)
.admin(admin)
.statsService(consumerStatsService)
.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.clientId("edqs-" + queue.name().toLowerCase() + "-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(queue.getTopic()))
.clientId("edqs-state-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(edqsConfig.getStateTopic()))
.settings(kafkaSettings)
.admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin)
.admin(edqsStateAdmin)
.build();
}
@Override
public TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate() {
String requestsConsumerGroup = "edqs-requests-consumer-group-" + edqsConfig.getLabel();
var requestConsumer = TbKafkaConsumerTemplate.<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>>builder()
.settings(kafkaSettings)
.topic(topicService.buildTopicName(edqsConfig.getRequestsTopic()))
.clientId("edqs-requests-consumer-" + serviceInfoProvider.getServiceId())
.groupId(topicService.buildTopicName(requestsConsumerGroup))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToEdqsMsg.parseFrom(msg.getData()), msg.getHeaders()))
.admin(edqsRequestsAdmin)
.statsService(consumerStatsService);
var requestConsumer = createEdqsMsgConsumer(edqsConfig.getRequestsTopic(),
"edqs-requests-consumer-" + serviceInfoProvider.getServiceId(),
"edqs-requests-consumer-group",
false, edqsRequestsAdmin);
var responseProducer = TbKafkaProducerTemplate.<TbProtoQueueMsg<FromEdqsMsg>>builder()
.settings(kafkaSettings)
.clientId("edqs-response-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(edqsConfig.getResponsesTopic()))
.admin(edqsRequestsAdmin);
.admin(edqsRequestsAdmin)
.build();
return DefaultTbQueueResponseTemplate.<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>>builder()
.requestTemplate(requestConsumer.build())
.responseTemplate(responseProducer.build())
.requestTemplate(requestConsumer)
.responseTemplate(responseProducer)
.maxPendingRequests(edqsConfig.getMaxPendingRequests())
.requestTimeout(edqsConfig.getMaxRequestTimeout())
.pollInterval(edqsConfig.getPollInterval())
@ -129,7 +139,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
}
@Override
public TbQueueAdmin getEdqsQueueAdmin() {
public TbKafkaAdmin getEdqsQueueAdmin() {
return edqsEventsAdmin;
}

View File

@ -15,6 +15,8 @@
*/
package org.thingsboard.server.queue.kafka;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
@ -45,6 +47,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
private final TbKafkaSettings settings;
private final Map<String, String> topicConfigs;
@Getter
private final int numPartitions;
private volatile Set<String> topics;
@ -157,8 +160,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
if (partitionId == null) {
return;
}
Map<TopicPartition, OffsetAndMetadata> oldOffsets =
settings.getAdminClient().listConsumerGroupOffsets(fatGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
Map<TopicPartition, OffsetAndMetadata> oldOffsets = getConsumerGroupOffsets(fatGroupId);
if (oldOffsets.isEmpty()) {
return;
}
@ -169,8 +171,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
continue;
}
var om = consumerOffset.getValue();
Map<TopicPartition, OffsetAndMetadata> newOffsets =
settings.getAdminClient().listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
Map<TopicPartition, OffsetAndMetadata> newOffsets = getConsumerGroupOffsets(newGroupId);
var existingOffset = newOffsets.get(tp);
if (existingOffset == null) {
@ -187,6 +188,11 @@ public class TbKafkaAdmin implements TbQueueAdmin {
}
}
@SneakyThrows
public Map<TopicPartition, OffsetAndMetadata> getConsumerGroupOffsets(String groupId) {
return settings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
}
public boolean isTopicEmpty(String topic) {
return areAllTopicsEmpty(Set.of(topic));
}

View File

@ -16,6 +16,8 @@
package org.thingsboard.server.queue.kafka;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@ -39,7 +41,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Created by ashvayka on 24.09.18.
@ -47,13 +51,16 @@ import java.util.stream.Collectors;
@Slf4j
public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<ConsumerRecord<String, byte[]>, T> {
private final TbQueueAdmin admin;
private final TbKafkaAdmin admin;
private final KafkaConsumer<String, byte[]> consumer;
private final TbKafkaDecoder<T> decoder;
private final TbKafkaConsumerStatsService statsService;
@Getter
private final String groupId;
@Setter
private Function<String, Long> startOffsetProvider;
private final boolean readFromBeginning; // reset offset to beginning
private final boolean stopWhenRead; // stop consuming when reached end offset remembered on start
private int readCount;
@ -78,7 +85,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
statsService.registerClientGroup(groupId);
}
this.admin = admin;
this.admin = (TbKafkaAdmin) admin;
this.consumer = new KafkaConsumer<>(props);
this.decoder = decoder;
this.readFromBeginning = readFromBeginning;
@ -105,14 +112,19 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
List<String> toSubscribe = new ArrayList<>();
topics.forEach((topic, kafkaPartitions) -> {
if (kafkaPartitions == null) {
toSubscribe.add(topic);
} else {
List<TopicPartition> topicPartitions = kafkaPartitions.stream()
.map(partition -> new TopicPartition(topic, partition))
.toList();
consumer.assign(topicPartitions);
onPartitionsAssigned(topicPartitions);
if (groupId != null) {
toSubscribe.add(topic);
return;
} else { // if no consumer group management - manually assigning all topic partitions
kafkaPartitions = IntStream.range(0, admin.getNumPartitions()).boxed().toList();
}
}
List<TopicPartition> topicPartitions = kafkaPartitions.stream()
.map(partition -> new TopicPartition(topic, partition))
.toList();
consumer.assign(topicPartitions);
onPartitionsAssigned(topicPartitions);
});
if (!toSubscribe.isEmpty()) {
if (readFromBeginning || stopWhenRead) {
@ -179,9 +191,21 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
private void onPartitionsAssigned(Collection<TopicPartition> partitions) {
if (readFromBeginning) {
log.debug("Seeking to beginning for {}", partitions);
consumer.seekToBeginning(partitions);
} else if (startOffsetProvider != null) {
partitions.forEach(topicPartition -> {
Long offset = startOffsetProvider.apply(topicPartition.topic());
if (offset != null) {
log.debug("Seeking to offset {} for {}", offset, topicPartition);
consumer.seek(topicPartition, offset);
} else {
log.info("No start offset provided for {}", topicPartition);
}
});
}
if (stopWhenRead) {
log.debug("Getting end offsets for {}", partitions);
endOffsets = consumer.endOffsets(partitions).entrySet().stream()
.filter(entry -> entry.getValue() > 0)
.collect(Collectors.toMap(entry -> entry.getKey().partition(), Map.Entry::getValue));
@ -195,7 +219,9 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
@Override
protected void doCommit() {
consumer.commitSync();
if (groupId != null) {
consumer.commitSync();
}
}
@Override

View File

@ -20,11 +20,10 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.edqs.EdqsQueue;
public interface EdqsClientQueueFactory {
TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue);
TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsProducer();
TbQueueRequestTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsRequestTemplate();

View File

@ -37,7 +37,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
@ -239,8 +238,8 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
return new InMemoryTbQueueProducer<>(storage, queue.getTopic());
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsProducer() {
return new InMemoryTbQueueProducer<>(storage, edqsConfig.getEventsTopic());
}
@Override

View File

@ -54,7 +54,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
@ -590,10 +589,10 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(queue.getTopic()))
.clientId("edqs-events-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(edqsConfig.getEventsTopic()))
.settings(kafkaSettings)
.admin(edqsEventsAdmin)
.build();

View File

@ -52,7 +52,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
@ -480,10 +479,10 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(queue.getTopic()))
.clientId("edqs-events-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(edqsConfig.getEventsTopic()))
.settings(kafkaSettings)
.admin(edqsEventsAdmin)
.build();

View File

@ -48,7 +48,7 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
@ -79,6 +79,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueEdgeSettings edgeSettings;
private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
private final EdqsConfig edqsConfig;
private final TbQueueAdmin coreAdmin;
private final TbKafkaAdmin ruleEngineAdmin;
@ -101,7 +102,9 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbQueueEdgeSettings edgeSettings, TbQueueCalculatedFieldSettings calculatedFieldSettings,
TbQueueEdgeSettings edgeSettings,
TbQueueCalculatedFieldSettings calculatedFieldSettings,
EdqsConfig edqsConfig,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
@ -113,6 +116,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
this.transportNotificationSettings = transportNotificationSettings;
this.edgeSettings = edgeSettings;
this.calculatedFieldSettings = calculatedFieldSettings;
this.edqsConfig = edqsConfig;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@ -385,10 +389,10 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(queue.getTopic()))
.clientId("edqs-events-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(edqsConfig.getEventsTopic()))
.settings(kafkaSettings)
.admin(edqsEventsAdmin)
.build();

View File

@ -57,6 +57,10 @@ queue:
partitions: "${TB_EDQS_PARTITIONS:12}"
# EDQS partitioning strategy: tenant (partitions are resolved and distributed by tenant id) or none (partitions are resolved by message key; each instance has all the partitions)
partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}"
# EDQS events topic
events_topic: "${TB_EDQS_EVENTS_TOPIC:edqs.events}"
# EDQS state topic
state_topic: "${TB_EDQS_STATE_TOPIC:edqs.state}"
# EDQS requests topic
requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}"
# EDQS responses topic
@ -149,7 +153,7 @@ queue:
# value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
topic-properties:
# Kafka properties for EDQS events topics
edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}"
edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:86400000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}"
# Kafka properties for EDQS requests topic (default: 3 minutes retention)
edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
# Kafka properties for EDQS state topic (infinite retention, compaction)