diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 90d4056afc..3620ad3639 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -97,7 +97,10 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta .scheduler(eventConsumer.getScheduler()) .taskExecutor(eventConsumer.getTaskExecutor()) .build(); - super.stateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer); + super.stateService = KafkaQueueStateService., TbProtoQueueMsg>builder() + .eventConsumer(eventConsumer) + .stateConsumer(stateConsumer) + .build(); this.stateProducer = (TbKafkaProducerTemplate>) queueFactory.createCalculatedFieldStateProducer(); } diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java index 7d5a0cb0fd..cd46c4ba96 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java @@ -59,7 +59,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.discovery.HashPartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.environment.DistributedLock; import org.thingsboard.server.queue.environment.DistributedLockService; import org.thingsboard.server.queue.provider.EdqsClientQueueFactory; @@ -96,7 +95,7 @@ public class DefaultEdqsService implements EdqsService { private void init() { executor = ThingsBoardExecutors.newWorkStealingPool(12, getClass()); eventsProducer = EdqsProducer.builder() - .producer(queueFactory.createEdqsMsgProducer(EdqsQueue.EVENTS)) + .producer(queueFactory.createEdqsEventsProducer()) .partitionService(edqsPartitionService) .build(); syncLock = distributedLockService.getLock("edqs_sync"); @@ -148,9 +147,12 @@ public class DefaultEdqsService implements EdqsService { syncLock.lock(); try { EdqsSyncState syncState = getSyncState(); - if (syncState != null && syncState.getStatus() == EdqsSyncStatus.FINISHED) { - log.info("EDQS sync is already finished"); - return; + if (syncState != null) { + EdqsSyncStatus status = syncState.getStatus(); + if (status == EdqsSyncStatus.FINISHED || status == EdqsSyncStatus.FAILED) { + log.info("EDQS sync is already " + status + ", ignoring the msg"); + return; + } } saveSyncState(EdqsSyncStatus.STARTED); diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java index ad7b7b970d..239fd9dc42 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java @@ -19,7 +19,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.edqs.EdqsConfig; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaSettings; @@ -37,7 +36,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService { TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap()); this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions()) .mapToObj(partition -> TopicPartitionInfo.builder() - .topic(EdqsQueue.EVENTS.getTopic()) + .topic(edqsConfig.getEventsTopic()) .partition(partition) .build().getFullTopicName()) .collect(Collectors.toSet())); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index bce1992932..125a06299d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -54,7 +54,7 @@ import org.thingsboard.server.service.cf.CalculatedFieldCache; import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; -import org.thingsboard.server.service.queue.processing.AbstractConsumerPartitionedService; +import org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService; import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; @@ -71,7 +71,7 @@ import java.util.stream.Collectors; @Service @TbRuleEngineComponent @Slf4j -public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPartitionedService implements TbCalculatedFieldConsumerService { +public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBasedConsumerService implements TbCalculatedFieldConsumerService { @Value("${queue.calculated_fields.poll_interval:25}") private long pollInterval; @@ -99,7 +99,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar } @Override - protected void doAfterStartUp() { + protected void onStartUp() { var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); PartitionedQueueConsumerManager> eventConsumer = PartitionedQueueConsumerManager.>create() .queueKey(queueKey) @@ -126,7 +126,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar } @Override - protected void processPartitionChangeEvent(PartitionChangeEvent event) { + protected void onPartitionChangeEvent(PartitionChangeEvent event) { try { event.getNewPartitions().forEach((queueKey, partitions) -> { if (queueKey.getQueueName().equals(DataConstants.CF_QUEUE_NAME)) { @@ -143,11 +143,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar } } - @Override - protected String getPrefix() { - return "tb-cf"; - } - private void processMsgs(List> msgs, TbQueueConsumer> consumer, QueueConfig config) throws Exception { List> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList(); ConcurrentMap> pendingMap = orderedMsgList.stream().collect( @@ -195,6 +190,11 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar return ServiceType.TB_RULE_ENGINE; } + @Override + protected String getPrefix() { + return "tb-cf"; + } + @Override protected long getNotificationPollDuration() { return pollInterval; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 3a9be3874a..98f857750a 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -49,7 +49,7 @@ import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldCache; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; -import org.thingsboard.server.service.queue.processing.AbstractConsumerPartitionedService; +import org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService; import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineConsumerContext; import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineQueueConsumerManager; import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; @@ -66,7 +66,7 @@ import java.util.stream.Collectors; @Service @TbRuleEngineComponent @Slf4j -public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitionedService implements TbRuleEngineConsumerService { +public class DefaultTbRuleEngineConsumerService extends AbstractPartitionBasedConsumerService implements TbRuleEngineConsumerService { private final TbRuleEngineConsumerContext ctx; private final QueueService queueService; @@ -93,7 +93,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitio } @Override - protected void doAfterStartUp() { + protected void onStartUp() { List queues = queueService.findAllQueues(); for (Queue configuration : queues) { if (partitionService.isManagedByCurrentService(configuration.getTenantId())) { @@ -104,7 +104,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitio } @Override - protected void processPartitionChangeEvent(PartitionChangeEvent event) { + protected void onPartitionChangeEvent(PartitionChangeEvent event) { event.getNewPartitions().forEach((queueKey, partitions) -> { if (DataConstants.CF_QUEUE_NAME.equals(queueKey.getQueueName()) || DataConstants.CF_STATES_QUEUE_NAME.equals(queueKey.getQueueName())) { return; @@ -136,11 +136,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitio }); } - @Override - protected String getPrefix() { - return "tb-rule-engine"; - } - @Override protected void stopConsumers() { super.stopConsumers(); @@ -153,6 +148,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitio return ServiceType.TB_RULE_ENGINE; } + @Override + protected String getPrefix() { + return "tb-rule-engine"; + } + @Override protected long getNotificationPollDuration() { return ctx.getPollDuration(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerPartitionedService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractPartitionBasedConsumerService.java similarity index 62% rename from application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerPartitionedService.java rename to application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractPartitionBasedConsumerService.java index 94b5390be1..97aa81d41c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerPartitionedService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractPartitionBasedConsumerService.java @@ -31,24 +31,22 @@ import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsServ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -public abstract class AbstractConsumerPartitionedService extends AbstractConsumerService { +public abstract class AbstractPartitionBasedConsumerService extends AbstractConsumerService { - private final Lock startupLock; - private volatile boolean consumersInitialized; + private final Lock startupLock = new ReentrantLock(); + private volatile boolean started = false; private PartitionChangeEvent lastPartitionChangeEvent; - public AbstractConsumerPartitionedService(ActorSystemContext actorContext, - TbTenantProfileCache tenantProfileCache, - TbDeviceProfileCache deviceProfileCache, - TbAssetProfileCache assetProfileCache, - CalculatedFieldCache calculatedFieldCache, - TbApiUsageStateService apiUsageStateService, - PartitionService partitionService, - ApplicationEventPublisher eventPublisher, - JwtSettingsService jwtSettingsService) { + public AbstractPartitionBasedConsumerService(ActorSystemContext actorContext, + TbTenantProfileCache tenantProfileCache, + TbDeviceProfileCache deviceProfileCache, + TbAssetProfileCache assetProfileCache, + CalculatedFieldCache calculatedFieldCache, + TbApiUsageStateService apiUsageStateService, + PartitionService partitionService, + ApplicationEventPublisher eventPublisher, + JwtSettingsService jwtSettingsService) { super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService); - this.startupLock = new ReentrantLock(); - this.consumersInitialized = false; } @PostConstruct @@ -57,13 +55,14 @@ public abstract class AbstractConsumerPartitionedService { entitiesById.computeIfAbsent(ed.getEntityType(), et -> new ConcurrentHashMap<>()).put(ed.getId(), ed); } - public boolean remove(EntityData ed) { - var map = entitiesById.get(ed.getEntityType()); + public boolean remove(EntityType entityType, UUID entityId) { + var map = entitiesById.get(entityType); if (map != null) { - return map.remove(ed.getId()) != null; + return map.remove(entityId) != null; } else { return false; } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index fb15c3fc73..07575220eb 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -62,7 +62,6 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.edqs.EdqsComponent; import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.edqs.EdqsConfig.EdqsPartitioningStrategy; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.edqs.EdqsQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; @@ -123,8 +122,8 @@ public class EdqsProcessor implements TbQueueHandler, }; eventConsumer = PartitionedQueueConsumerManager.>create() - .queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.EVENTS.getTopic())) - .topic(EdqsQueue.EVENTS.getTopic()) + .queueKey(new QueueKey(ServiceType.EDQS, config.getEventsTopic())) + .topic(config.getEventsTopic()) .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg queueMsg : msgs) { @@ -133,14 +132,14 @@ public class EdqsProcessor implements TbQueueHandler, } try { ToEdqsMsg msg = queueMsg.getValue(); - process(msg, EdqsQueue.EVENTS); + process(msg, true); } catch (Exception t) { log.error("Failed to process message: {}", queueMsg, t); } } consumer.commit(); }) - .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS)) + .consumerCreator((config, partitionId) -> queueFactory.createEdqsEventsConsumer()) .queueAdmin(queueFactory.getEdqsQueueAdmin()) .consumerExecutor(consumersExecutor) .taskExecutor(taskExecutor) @@ -165,7 +164,7 @@ public class EdqsProcessor implements TbQueueHandler, try { Set newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS)); - stateService.process(withTopic(newPartitions, EdqsQueue.STATE.getTopic())); + stateService.process(withTopic(newPartitions, config.getStateTopic())); // eventsConsumer's partitions are updated by stateService responseTemplate.subscribe(withTopic(newPartitions, config.getRequestsTopic())); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template @@ -235,7 +234,7 @@ public class EdqsProcessor implements TbQueueHandler, return response; } - public void process(ToEdqsMsg edqsMsg, EdqsQueue queue) { + public void process(ToEdqsMsg edqsMsg, boolean backup) { log.trace("Processing message: {}", edqsMsg); if (edqsMsg.hasEventMsg()) { EdqsEventMsg eventMsg = edqsMsg.getEventMsg(); @@ -252,7 +251,7 @@ public class EdqsProcessor implements TbQueueHandler, } else if (!ObjectType.unversionedTypes.contains(objectType)) { log.warn("[{}] {} {} doesn't have version", tenantId, objectType, key); } - if (queue != EdqsQueue.STATE) { + if (backup) { stateService.save(tenantId, objectType, key, eventType, edqsMsg); } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java index 870574a786..b9f3589280 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java @@ -150,8 +150,9 @@ public class TenantRepo { } } else if (RelationTypeGroup.DASHBOARD.equals(entity.getTypeGroup())) { if (EntityRelation.CONTAINS_TYPE.equals(entity.getType()) && entity.getFrom().getEntityType() == EntityType.CUSTOMER) { - ((CustomerData) getEntityMap(EntityType.CUSTOMER).computeIfAbsent(entity.getFrom().getId(), CustomerData::new)) - .addOrUpdate(getEntityMap(EntityType.DASHBOARD).get(entity.getTo().getId())); + CustomerData customerData = (CustomerData) getOrCreate(entity.getFrom()); + EntityData dashboardData = getOrCreate(entity.getTo()); + customerData.addOrUpdate(dashboardData); } } } finally { @@ -170,8 +171,10 @@ public class TenantRepo { } } else if (RelationTypeGroup.DASHBOARD.equals(entityRelation.getTypeGroup())) { if (EntityRelation.CONTAINS_TYPE.equals(entityRelation.getType()) && entityRelation.getFrom().getEntityType() == EntityType.CUSTOMER) { - ((CustomerData) getEntityMap(EntityType.CUSTOMER).computeIfAbsent(entityRelation.getFrom().getId(), CustomerData::new)) - .remove(getEntityMap(EntityType.DASHBOARD).get(entityRelation.getTo().getId())); + CustomerData customerData = (CustomerData) get(entityRelation.getFrom()); + if (customerData != null) { + customerData.remove(EntityType.DASHBOARD, entityRelation.getTo().getId()); + } } } } @@ -197,13 +200,13 @@ public class TenantRepo { entityData.setCustomerId(newCustomerId); if (entityIdMismatch(oldCustomerId, newCustomerId)) { if (oldCustomerId != null) { - CustomerData old = (CustomerData) getEntityMap(EntityType.CUSTOMER).get(oldCustomerId); + CustomerData old = (CustomerData) get(EntityType.CUSTOMER, oldCustomerId); if (old != null) { - old.remove(entityData); + old.remove(entityType, entityId); } } if (newCustomerId != null) { - CustomerData newData = (CustomerData) getEntityMap(EntityType.CUSTOMER).computeIfAbsent(newCustomerId, CustomerData::new); + CustomerData newData = (CustomerData) getOrCreate(EntityType.CUSTOMER, newCustomerId); newData.addOrUpdate(entityData); } } @@ -223,11 +226,12 @@ public class TenantRepo { getEntitySet(entityType).remove(removed); } edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.fromEntityType(entityType), EdqsEventType.DELETED)); + UUID customerId = removed.getCustomerId(); if (customerId != null) { - CustomerData customerData = (CustomerData) getEntityMap(EntityType.CUSTOMER).get(customerId); + CustomerData customerData = (CustomerData) get(EntityType.CUSTOMER, customerId); if (customerData != null) { - customerData.remove(removed); + customerData.remove(entityType, entityId); } } } @@ -303,7 +307,11 @@ public class TenantRepo { } private EntityData get(EntityId entityId) { - return getEntityMap(entityId.getEntityType()).get(entityId.getId()); + return get(entityId.getEntityType(), entityId.getId()); + } + + private EntityData get(EntityType entityType, UUID entityId) { + return getEntityMap(entityType).get(entityId); } private EntityData constructEntityData(EntityType entityType, UUID id) { @@ -425,7 +433,7 @@ public class TenantRepo { EntityType entityType = entityId.getEntityType(); return switch (entityType) { case CUSTOMER, TENANT -> { - EntityFields fields = getEntityMap(entityType).get(entityId.getId()).getFields(); + EntityFields fields = get(entityId).getFields(); yield fields != null ? fields.getName() : ""; } default -> throw new RuntimeException("Unsupported entity type: " + entityType); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index c59707c9c3..efdb1ead1c 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -36,12 +36,14 @@ import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; import org.thingsboard.server.queue.common.state.KafkaQueueStateService; import org.thingsboard.server.queue.common.state.QueueStateService; import org.thingsboard.server.queue.discovery.QueueKey; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; -import org.thingsboard.server.queue.edqs.EdqsQueue; -import org.thingsboard.server.queue.edqs.EdqsQueueFactory; import org.thingsboard.server.queue.edqs.KafkaEdqsComponent; +import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory; +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -56,8 +58,7 @@ public class KafkaEdqsStateService implements EdqsStateService { private final EdqsConfig config; private final EdqsPartitionService partitionService; - private final EdqsQueueFactory queueFactory; - private final TopicService topicService; + private final KafkaEdqsQueueFactory queueFactory; @Autowired @Lazy private EdqsProcessor edqsProcessor; @@ -73,15 +74,16 @@ public class KafkaEdqsStateService implements EdqsStateService { @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { + TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin(); stateConsumer = PartitionedQueueConsumerManager.>create() - .queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.STATE.getTopic())) - .topic(EdqsQueue.STATE.getTopic()) + .queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic())) + .topic(config.getStateTopic()) .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg queueMsg : msgs) { try { ToEdqsMsg msg = queueMsg.getValue(); - edqsProcessor.process(msg, EdqsQueue.STATE); + edqsProcessor.process(msg, false); if (stateReadCount.incrementAndGet() % 100000 == 0) { log.info("[state] Processed {} msgs", stateReadCount.get()); } @@ -91,15 +93,15 @@ public class KafkaEdqsStateService implements EdqsStateService { } consumer.commit(); }) - .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.STATE)) - .queueAdmin(queueFactory.getEdqsQueueAdmin()) + .consumerCreator((config, partitionId) -> queueFactory.createEdqsStateConsumer()) + .queueAdmin(queueAdmin) .consumerExecutor(eventConsumer.getConsumerExecutor()) .taskExecutor(eventConsumer.getTaskExecutor()) .scheduler(eventConsumer.getScheduler()) .uncaughtErrorHandler(edqsProcessor.getErrorHandler()) .build(); - queueStateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer); + TbKafkaConsumerTemplate> eventsToBackupKafkaConsumer = queueFactory.createEdqsEventsToBackupConsumer(); eventsToBackupConsumer = QueueConsumerManager.>builder() .name("edqs-events-to-backup-consumer") .pollInterval(config.getPollInterval()) @@ -137,15 +139,35 @@ public class KafkaEdqsStateService implements EdqsStateService { } consumer.commit(); }) - .consumerCreator(() -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS, "events-to-backup-consumer-group")) // shared by all instances consumer group + .consumerCreator(() -> eventsToBackupKafkaConsumer) .consumerExecutor(eventConsumer.getConsumerExecutor()) .threadPrefix("edqs-events-to-backup") .build(); stateProducer = EdqsProducer.builder() - .producer(queueFactory.createEdqsMsgProducer(EdqsQueue.STATE)) + .producer(queueFactory.createEdqsStateProducer()) .partitionService(partitionService) .build(); + + queueStateService = KafkaQueueStateService., TbProtoQueueMsg>builder() + .eventConsumer(eventConsumer) + .stateConsumer(stateConsumer) + .eventsStartOffsetsProvider(() -> { + // taking start offsets for events topics from the events-to-backup consumer group, + // since eventConsumer doesn't use consumer group management and thus offset tracking + // (because we need to be able to consume the same topic-partition by multiple instances) + Map offsets = new HashMap<>(); + try { + queueAdmin.getConsumerGroupOffsets(eventsToBackupKafkaConsumer.getGroupId()) + .forEach((topicPartition, offsetAndMetadata) -> { + offsets.put(topicPartition.topic(), offsetAndMetadata.offset()); + }); + } catch (Exception e) { + log.error("Failed to get consumer group offsets for {}", eventsToBackupKafkaConsumer.getGroupId(), e); + } + return offsets; + }) + .build(); } @Override @@ -153,7 +175,7 @@ public class KafkaEdqsStateService implements EdqsStateService { if (queueStateService.getPartitions().isEmpty()) { Set allPartitions = IntStream.range(0, config.getPartitions()) .mapToObj(partition -> TopicPartitionInfo.builder() - .topic(EdqsQueue.EVENTS.getTopic()) + .topic(config.getEventsTopic()) .partition(partition) .build()) .collect(Collectors.toSet()); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java index 383115ddf1..cde21edfaf 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java @@ -29,7 +29,6 @@ import org.thingsboard.server.edqs.util.EdqsRocksDb; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent; import java.util.Set; @@ -61,14 +60,14 @@ public class LocalEdqsStateService implements EdqsStateService { try { ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value); log.trace("[{}] Restored msg from RocksDB: {}", key, edqsMsg); - processor.process(edqsMsg, EdqsQueue.STATE); + processor.process(edqsMsg, false); } catch (Exception e) { log.error("[{}] Failed to restore value", key, e); } }); log.info("Restore completed"); } - eventConsumer.update(withTopic(partitions, EdqsQueue.EVENTS.getTopic())); + eventConsumer.update(withTopic(partitions, eventConsumer.getTopic())); this.partitions = partitions; } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java index f09826e9b6..b18debaf49 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java @@ -90,9 +90,7 @@ public class TopicPartitionInfo { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TopicPartitionInfo that = (TopicPartitionInfo) o; - return topic.equals(that.topic) && - Objects.equals(tenantId, that.tenantId) && - Objects.equals(partition, that.partition) && + return Objects.equals(partition, that.partition) && fullTopicName.equals(that.fullTopicName); } diff --git a/common/message/src/test/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfoTest.java b/common/message/src/test/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfoTest.java index ec1e106dd8..f222a5a094 100644 --- a/common/message/src/test/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfoTest.java +++ b/common/message/src/test/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfoTest.java @@ -18,12 +18,16 @@ package org.thingsboard.server.common.msg.queue; import org.junit.jupiter.api.Test; import org.thingsboard.server.common.data.id.TenantId; +import java.util.UUID; + import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; public class TopicPartitionInfoTest { + private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + @Test public void givenTopicPartitionInfo_whenEquals_thenTrue() { @@ -52,13 +56,13 @@ public class TopicPartitionInfoTest { assertThat(TopicPartitionInfo.builder() .topic("tb_core") - .tenantId(TenantId.SYS_TENANT_ID) + .tenantId(tenantId) .partition(4) .myPartition(true) //will ignored on equals .build() , is(TopicPartitionInfo.builder() .topic("tb_core") - .tenantId(TenantId.SYS_TENANT_ID) + .tenantId(tenantId) .partition(4) .myPartition(true) //will ignored on equals .build())); @@ -109,7 +113,7 @@ public class TopicPartitionInfoTest { assertThat(TopicPartitionInfo.builder() .topic("tb_core") - .tenantId(TenantId.SYS_TENANT_ID) + .tenantId(tenantId) .partition(4) .myPartition(true) //will ignored on equals .build() @@ -117,7 +121,7 @@ public class TopicPartitionInfoTest { assertThat(TopicPartitionInfo.builder() .topic("tb_core") - .tenantId(TenantId.SYS_TENANT_ID) + .tenantId(tenantId) .partition(4) .myPartition(false) //will ignored on equals .build() diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java index 14394bbbe9..ef9728344c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java @@ -26,6 +26,7 @@ import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdateConfigTask; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdatePartitionsTask; import org.thingsboard.server.queue.discovery.QueueKey; +import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import java.util.Collection; import java.util.Collections; @@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; @Slf4j public class MainQueueConsumerManager { @@ -296,7 +298,7 @@ public class MainQueueConsumerManager removedPartitions) { @@ -304,13 +306,19 @@ public class MainQueueConsumerManager Optional.ofNullable(consumers.remove(tpi)).ifPresent(TbQueueConsumerTask::awaitCompletion)); } - protected void addPartitions(Set partitions, Consumer onStop) { + protected void addPartitions(Set partitions, Consumer onStop, Function startOffsetProvider) { partitions.forEach(tpi -> { Integer partitionId = tpi.getPartition().orElse(-1); String key = queueKey + "-" + partitionId; Runnable callback = onStop != null ? () -> onStop.accept(tpi) : null; - TbQueueConsumerTask consumer = new TbQueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId), callback); + TbQueueConsumerTask consumer = new TbQueueConsumerTask<>(key, () -> { + TbQueueConsumer queueConsumer = consumerCreator.apply(config, partitionId); + if (startOffsetProvider != null && queueConsumer instanceof TbKafkaConsumerTemplate kafkaConsumer) { + kafkaConsumer.setStartOffsetProvider(startOffsetProvider); + } + return queueConsumer; + }, callback); consumers.put(tpi, consumer); consumer.subscribe(Set.of(tpi)); launchConsumer(consumer); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java index f25a98adf4..0de1e53753 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; @Slf4j public class PartitionedQueueConsumerManager extends MainQueueConsumerManager { @@ -57,7 +58,7 @@ public class PartitionedQueueConsumerManager extends MainQ protected void processTask(TbQueueConsumerManagerTask task) { if (task instanceof AddPartitionsTask addPartitionsTask) { log.info("[{}] Added partitions: {}", queueKey, addPartitionsTask.partitions()); - consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop()); + consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop(), addPartitionsTask.startOffsetProvider()); } else if (task instanceof RemovePartitionsTask removePartitionsTask) { log.info("[{}] Removed partitions: {}", queueKey, removePartitionsTask.partitions()); consumerWrapper.removePartitions(removePartitionsTask.partitions()); @@ -76,11 +77,11 @@ public class PartitionedQueueConsumerManager extends MainQ } public void addPartitions(Set partitions) { - addPartitions(partitions, null); + addPartitions(partitions, null, null); } - public void addPartitions(Set partitions, Consumer onStop) { - addTask(new AddPartitionsTask(partitions, onStop)); + public void addPartitions(Set partitions, Consumer onStop, Function startOffsetProvider) { + addTask(new AddPartitionsTask(partitions, onStop, startOffsetProvider)); } public void removePartitions(Set partitions) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java index e0dd9b808b..a287a391af 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java @@ -20,6 +20,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import java.util.Set; import java.util.function.Consumer; +import java.util.function.Function; public interface TbQueueConsumerManagerTask { @@ -46,7 +47,9 @@ public interface TbQueueConsumerManagerTask { } } - record AddPartitionsTask(Set partitions, Consumer onStop) implements TbQueueConsumerManagerTask { + record AddPartitionsTask(Set partitions, + Consumer onStop, + Function startOffsetProvider) implements TbQueueConsumerManagerTask { @Override public QueueTaskType getType() { return QueueTaskType.ADD_PARTITIONS; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java index 9adc6bb996..bf02afe86c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java @@ -15,13 +15,16 @@ */ package org.thingsboard.server.queue.common.state; +import lombok.Builder; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.discovery.QueueKey; +import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic; @@ -29,14 +32,21 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop public class KafkaQueueStateService extends QueueStateService { private final PartitionedQueueConsumerManager stateConsumer; + private final Supplier> eventsStartOffsetsProvider; - public KafkaQueueStateService(PartitionedQueueConsumerManager eventConsumer, PartitionedQueueConsumerManager stateConsumer) { + @Builder + public KafkaQueueStateService(PartitionedQueueConsumerManager eventConsumer, + PartitionedQueueConsumerManager stateConsumer, + Supplier> eventsStartOffsetsProvider) { super(eventConsumer); this.stateConsumer = stateConsumer; + this.eventsStartOffsetsProvider = eventsStartOffsetsProvider; } @Override protected void addPartitions(QueueKey queueKey, Set partitions) { + Map eventsStartOffsets = eventsStartOffsetsProvider != null ? eventsStartOffsetsProvider.get() : null; // remembering the offsets before subscribing to states + Set statePartitions = withTopic(partitions, stateConsumer.getTopic()); partitionsInProgress.addAll(statePartitions); stateConsumer.addPartitions(statePartitions, statePartition -> { @@ -51,12 +61,12 @@ public class KafkaQueueStateService TopicPartitionInfo eventPartition = statePartition.withTopic(eventConsumer.getTopic()); if (this.partitions.get(queueKey).contains(eventPartition)) { - eventConsumer.addPartitions(Set.of(eventPartition)); + eventConsumer.addPartitions(Set.of(eventPartition), null, eventsStartOffsets != null ? eventsStartOffsets::get : null); } } finally { readLock.unlock(); } - }); + }, null); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index 5992083d85..2a0fad0643 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -103,6 +103,9 @@ public class TopicService { } public String buildTopicName(String topic) { + if (topic == null) { + return null; + } return prefix.isBlank() ? topic : prefix + "." + topic; } @@ -113,9 +116,9 @@ public class TopicService { public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) { return this.buildTopicName( servicePrefix + queueName - + (tenantId.isSysTenantId() ? "" : ("-isolated-" + tenantId)) - + "-consumer" - + suffix(partitionId)); + + (tenantId.isSysTenantId() ? "" : ("-isolated-" + tenantId)) + + "-consumer" + + suffix(partitionId)); } String suffix(Integer partitionId) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index f7a4d2abf6..cf9f27ee39 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -315,7 +315,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi ScheduledFuture task = delayedTasks.remove(serviceId); if (task != null) { if (task.cancel(false)) { - log.debug("[{}] Recalculate partitions ignored. Service was restarted in time [{}].", + log.info("[{}] Recalculate partitions ignored. Service was restarted in time [{}].", serviceId, serviceTypesList); } else { log.debug("[{}] Going to recalculate partitions. Service was not restarted in time [{}]!", diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java index e4e1e81815..401b451f59 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java @@ -30,6 +30,10 @@ public class EdqsConfig { @Value("#{'${queue.edqs.partitioning_strategy:tenant}'.toUpperCase()}") private EdqsPartitioningStrategy partitioningStrategy; + @Value("${queue.edqs.events_topic:edqs.events}") + private String eventsTopic; + @Value("${queue.edqs.state_topic:edqs.state}") + private String stateTopic; @Value("${queue.edqs.requests_topic:edqs.requests}") private String requestsTopic; @Value("${queue.edqs.responses_topic:edqs.responses}") diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueue.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueue.java deleted file mode 100644 index d859b50994..0000000000 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueue.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.queue.edqs; - -import lombok.Getter; - -@Getter -public enum EdqsQueue { - - EVENTS("edqs.events", false, false), - STATE("edqs.state", true, true); - - private final String topic; - private final boolean readFromBeginning; - private final boolean stopWhenRead; - - EdqsQueue(String topic, boolean readFromBeginning, boolean stopWhenRead) { - this.topic = topic; - this.readFromBeginning = readFromBeginning; - this.stopWhenRead = stopWhenRead; - } - -} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java index b5541c740b..5c0d68779a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java @@ -25,11 +25,13 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; public interface EdqsQueueFactory { - TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue); + TbQueueConsumer> createEdqsEventsConsumer(); - TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue, String group); + TbQueueConsumer> createEdqsEventsToBackupConsumer(); - TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue); + TbQueueConsumer> createEdqsStateConsumer(); + + TbQueueProducer> createEdqsStateProducer(); TbQueueResponseTemplate, TbProtoQueueMsg> createEdqsResponseTemplate(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java index 0801399c14..8c670e66c0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java @@ -43,24 +43,23 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory { private final TbQueueAdmin queueAdmin; @Override - public TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue) { - if (queue == EdqsQueue.STATE) { - throw new UnsupportedOperationException(); - } - return new InMemoryTbQueueConsumer<>(storage, queue.getTopic()); + public TbQueueConsumer> createEdqsEventsConsumer() { + return new InMemoryTbQueueConsumer<>(storage, edqsConfig.getEventsTopic()); } @Override - public TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue, String group) { - return createEdqsMsgConsumer(queue); + public TbQueueConsumer> createEdqsEventsToBackupConsumer() { + throw new UnsupportedOperationException(); } @Override - public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { - if (queue == EdqsQueue.STATE) { - throw new UnsupportedOperationException(); - } - return new InMemoryTbQueueProducer<>(storage, queue.getTopic()); + public TbQueueConsumer> createEdqsStateConsumer() { + throw new UnsupportedOperationException(); + } + + @Override + public TbQueueProducer> createEdqsStateProducer() { + throw new UnsupportedOperationException(); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java index a322cc5434..ab88943b10 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java @@ -19,11 +19,8 @@ import org.springframework.stereotype.Component; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsType; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; -import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueResponseTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueResponseTemplate; @@ -71,55 +68,68 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { } @Override - public TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue) { - String consumerGroup = "edqs-" + queue.name().toLowerCase() + "-consumer-group-" + serviceInfoProvider.getServiceId(); - return createEdqsMsgConsumer(queue, consumerGroup); + public TbKafkaConsumerTemplate> createEdqsEventsConsumer() { + return createEdqsMsgConsumer(edqsConfig.getEventsTopic(), + "edqs-events-" + consumerCounter.getAndIncrement() + "-consumer-" + serviceInfoProvider.getServiceId(), + null, // not using consumer group management, offsets from the edqs-events-to-backup-consumer-group are used (see KafkaEdqsStateService) + false, edqsEventsAdmin); } @Override - public TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue, String group) { + public TbKafkaConsumerTemplate> createEdqsEventsToBackupConsumer() { + return createEdqsMsgConsumer(edqsConfig.getEventsTopic(), + "edqs-events-to-backup-consumer-" + serviceInfoProvider.getServiceId(), + "edqs-events-to-backup-consumer-group", + false, edqsEventsAdmin); + } + + @Override + public TbKafkaConsumerTemplate> createEdqsStateConsumer() { + return createEdqsMsgConsumer(edqsConfig.getStateTopic(), + "edqs-state-" + consumerCounter.getAndIncrement() + "-consumer-" + serviceInfoProvider.getServiceId(), + null, // not using consumer group management + true, edqsStateAdmin); + } + + public TbKafkaConsumerTemplate> createEdqsMsgConsumer(String topic, String clientId, String group, boolean readFullAndStop, TbKafkaAdmin admin) { return TbKafkaConsumerTemplate.>builder() .settings(kafkaSettings) - .topic(topicService.buildTopicName(queue.getTopic())) - .readFromBeginning(queue.isReadFromBeginning()) - .stopWhenRead(queue.isStopWhenRead()) - .clientId("edqs-" + queue.name().toLowerCase() + "-" + consumerCounter.getAndIncrement() + "-consumer-" + serviceInfoProvider.getServiceId()) + .topic(topicService.buildTopicName(topic)) + .readFromBeginning(readFullAndStop) + .stopWhenRead(readFullAndStop) + .clientId(clientId) .groupId(topicService.buildTopicName(group)) .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdqsMsg.parseFrom(msg.getData()), msg.getHeaders())) - .admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin) + .admin(admin) .statsService(consumerStatsService) .build(); } @Override - public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { + public TbQueueProducer> createEdqsStateProducer() { return TbKafkaProducerTemplate.>builder() - .clientId("edqs-" + queue.name().toLowerCase() + "-producer-" + serviceInfoProvider.getServiceId()) - .defaultTopic(topicService.buildTopicName(queue.getTopic())) + .clientId("edqs-state-producer-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(edqsConfig.getStateTopic())) .settings(kafkaSettings) - .admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin) + .admin(edqsStateAdmin) .build(); } @Override public TbQueueResponseTemplate, TbProtoQueueMsg> createEdqsResponseTemplate() { - String requestsConsumerGroup = "edqs-requests-consumer-group-" + edqsConfig.getLabel(); - var requestConsumer = TbKafkaConsumerTemplate.>builder() - .settings(kafkaSettings) - .topic(topicService.buildTopicName(edqsConfig.getRequestsTopic())) - .clientId("edqs-requests-consumer-" + serviceInfoProvider.getServiceId()) - .groupId(topicService.buildTopicName(requestsConsumerGroup)) - .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToEdqsMsg.parseFrom(msg.getData()), msg.getHeaders())) - .admin(edqsRequestsAdmin) - .statsService(consumerStatsService); + var requestConsumer = createEdqsMsgConsumer(edqsConfig.getRequestsTopic(), + "edqs-requests-consumer-" + serviceInfoProvider.getServiceId(), + "edqs-requests-consumer-group", + false, edqsRequestsAdmin); var responseProducer = TbKafkaProducerTemplate.>builder() .settings(kafkaSettings) .clientId("edqs-response-producer-" + serviceInfoProvider.getServiceId()) .defaultTopic(topicService.buildTopicName(edqsConfig.getResponsesTopic())) - .admin(edqsRequestsAdmin); + .admin(edqsRequestsAdmin) + .build(); return DefaultTbQueueResponseTemplate., TbProtoQueueMsg>builder() - .requestTemplate(requestConsumer.build()) - .responseTemplate(responseProducer.build()) + .requestTemplate(requestConsumer) + .responseTemplate(responseProducer) .maxPendingRequests(edqsConfig.getMaxPendingRequests()) .requestTimeout(edqsConfig.getMaxRequestTimeout()) .pollInterval(edqsConfig.getPollInterval()) @@ -129,7 +139,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { } @Override - public TbQueueAdmin getEdqsQueueAdmin() { + public TbKafkaAdmin getEdqsQueueAdmin() { return edqsEventsAdmin; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index 3496aac76a..37f881b49c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.queue.kafka; +import lombok.Getter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.ListOffsetsResult; @@ -45,6 +47,7 @@ public class TbKafkaAdmin implements TbQueueAdmin { private final TbKafkaSettings settings; private final Map topicConfigs; + @Getter private final int numPartitions; private volatile Set topics; @@ -157,8 +160,7 @@ public class TbKafkaAdmin implements TbQueueAdmin { if (partitionId == null) { return; } - Map oldOffsets = - settings.getAdminClient().listConsumerGroupOffsets(fatGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + Map oldOffsets = getConsumerGroupOffsets(fatGroupId); if (oldOffsets.isEmpty()) { return; } @@ -169,8 +171,7 @@ public class TbKafkaAdmin implements TbQueueAdmin { continue; } var om = consumerOffset.getValue(); - Map newOffsets = - settings.getAdminClient().listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + Map newOffsets = getConsumerGroupOffsets(newGroupId); var existingOffset = newOffsets.get(tp); if (existingOffset == null) { @@ -187,6 +188,11 @@ public class TbKafkaAdmin implements TbQueueAdmin { } } + @SneakyThrows + public Map getConsumerGroupOffsets(String groupId) { + return settings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + } + public boolean isTopicEmpty(String topic) { return areAllTopicsEmpty(Set.of(topic)); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 4bd3bf0fe6..3abed8475a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -16,6 +16,8 @@ package org.thingsboard.server.queue.kafka; import lombok.Builder; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -39,7 +41,9 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Created by ashvayka on 24.09.18. @@ -47,13 +51,16 @@ import java.util.stream.Collectors; @Slf4j public class TbKafkaConsumerTemplate extends AbstractTbQueueConsumerTemplate, T> { - private final TbQueueAdmin admin; + private final TbKafkaAdmin admin; private final KafkaConsumer consumer; private final TbKafkaDecoder decoder; private final TbKafkaConsumerStatsService statsService; + @Getter private final String groupId; + @Setter + private Function startOffsetProvider; private final boolean readFromBeginning; // reset offset to beginning private final boolean stopWhenRead; // stop consuming when reached end offset remembered on start private int readCount; @@ -78,7 +85,7 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue statsService.registerClientGroup(groupId); } - this.admin = admin; + this.admin = (TbKafkaAdmin) admin; this.consumer = new KafkaConsumer<>(props); this.decoder = decoder; this.readFromBeginning = readFromBeginning; @@ -105,14 +112,19 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue List toSubscribe = new ArrayList<>(); topics.forEach((topic, kafkaPartitions) -> { if (kafkaPartitions == null) { - toSubscribe.add(topic); - } else { - List topicPartitions = kafkaPartitions.stream() - .map(partition -> new TopicPartition(topic, partition)) - .toList(); - consumer.assign(topicPartitions); - onPartitionsAssigned(topicPartitions); + if (groupId != null) { + toSubscribe.add(topic); + return; + } else { // if no consumer group management - manually assigning all topic partitions + kafkaPartitions = IntStream.range(0, admin.getNumPartitions()).boxed().toList(); + } } + + List topicPartitions = kafkaPartitions.stream() + .map(partition -> new TopicPartition(topic, partition)) + .toList(); + consumer.assign(topicPartitions); + onPartitionsAssigned(topicPartitions); }); if (!toSubscribe.isEmpty()) { if (readFromBeginning || stopWhenRead) { @@ -179,9 +191,21 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue private void onPartitionsAssigned(Collection partitions) { if (readFromBeginning) { + log.debug("Seeking to beginning for {}", partitions); consumer.seekToBeginning(partitions); + } else if (startOffsetProvider != null) { + partitions.forEach(topicPartition -> { + Long offset = startOffsetProvider.apply(topicPartition.topic()); + if (offset != null) { + log.debug("Seeking to offset {} for {}", offset, topicPartition); + consumer.seek(topicPartition, offset); + } else { + log.info("No start offset provided for {}", topicPartition); + } + }); } if (stopWhenRead) { + log.debug("Getting end offsets for {}", partitions); endOffsets = consumer.endOffsets(partitions).entrySet().stream() .filter(entry -> entry.getValue() > 0) .collect(Collectors.toMap(entry -> entry.getKey().partition(), Map.Entry::getValue)); @@ -195,7 +219,9 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue @Override protected void doCommit() { - consumer.commitSync(); + if (groupId != null) { + consumer.commitSync(); + } } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/EdqsClientQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/EdqsClientQueueFactory.java index 95be49f82b..f2309f7224 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/EdqsClientQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/EdqsClientQueueFactory.java @@ -20,11 +20,10 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.edqs.EdqsQueue; public interface EdqsClientQueueFactory { - TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue); + TbQueueProducer> createEdqsEventsProducer(); TbQueueRequestTemplate, TbProtoQueueMsg> createEdqsRequestTemplate(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index bd8b4bd4f8..89d83af826 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -37,7 +37,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.memory.InMemoryStorage; import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; @@ -239,8 +238,8 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { - return new InMemoryTbQueueProducer<>(storage, queue.getTopic()); + public TbQueueProducer> createEdqsEventsProducer() { + return new InMemoryTbQueueProducer<>(storage, edqsConfig.getEventsTopic()); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index c50344a23f..dcebe085b3 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -54,7 +54,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; @@ -590,10 +589,10 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { + public TbQueueProducer> createEdqsEventsProducer() { return TbKafkaProducerTemplate.>builder() - .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) - .defaultTopic(topicService.buildTopicName(queue.getTopic())) + .clientId("edqs-events-producer-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(edqsConfig.getEventsTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 2a1dc4171f..e9c42b0022 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -52,7 +52,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; @@ -480,10 +479,10 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { + public TbQueueProducer> createEdqsEventsProducer() { return TbKafkaProducerTemplate.>builder() - .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) - .defaultTopic(topicService.buildTopicName(queue.getTopic())) + .clientId("edqs-events-producer-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(edqsConfig.getEventsTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 2e342b0dd2..d43ef5c9ac 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -48,7 +48,7 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.edqs.EdqsQueue; +import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; @@ -79,6 +79,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbQueueEdgeSettings edgeSettings; private final TbQueueCalculatedFieldSettings calculatedFieldSettings; + private final EdqsConfig edqsConfig; private final TbQueueAdmin coreAdmin; private final TbKafkaAdmin ruleEngineAdmin; @@ -101,7 +102,9 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbKafkaConsumerStatsService consumerStatsService, TbQueueTransportNotificationSettings transportNotificationSettings, - TbQueueEdgeSettings edgeSettings, TbQueueCalculatedFieldSettings calculatedFieldSettings, + TbQueueEdgeSettings edgeSettings, + TbQueueCalculatedFieldSettings calculatedFieldSettings, + EdqsConfig edqsConfig, TbKafkaTopicConfigs kafkaTopicConfigs) { this.topicService = topicService; this.kafkaSettings = kafkaSettings; @@ -113,6 +116,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { this.transportNotificationSettings = transportNotificationSettings; this.edgeSettings = edgeSettings; this.calculatedFieldSettings = calculatedFieldSettings; + this.edqsConfig = edqsConfig; this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); @@ -385,10 +389,10 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { } @Override - public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { + public TbQueueProducer> createEdqsEventsProducer() { return TbKafkaProducerTemplate.>builder() - .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) - .defaultTopic(topicService.buildTopicName(queue.getTopic())) + .clientId("edqs-events-producer-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(edqsConfig.getEventsTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/edqs/src/main/resources/edqs.yml b/edqs/src/main/resources/edqs.yml index c101eff68e..353d76391b 100644 --- a/edqs/src/main/resources/edqs.yml +++ b/edqs/src/main/resources/edqs.yml @@ -57,6 +57,10 @@ queue: partitions: "${TB_EDQS_PARTITIONS:12}" # EDQS partitioning strategy: tenant (partitions are resolved and distributed by tenant id) or none (partitions are resolved by message key; each instance has all the partitions) partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}" + # EDQS events topic + events_topic: "${TB_EDQS_EVENTS_TOPIC:edqs.events}" + # EDQS state topic + state_topic: "${TB_EDQS_STATE_TOPIC:edqs.state}" # EDQS requests topic requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}" # EDQS responses topic @@ -149,7 +153,7 @@ queue: # value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) topic-properties: # Kafka properties for EDQS events topics - edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}" + edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:86400000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}" # Kafka properties for EDQS requests topic (default: 3 minutes retention) edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" # Kafka properties for EDQS state topic (infinite retention, compaction)