diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java index 0163d65d98..91c08ab6e0 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -19,14 +19,20 @@ import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.exception.CalculatedFieldStateException; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.common.state.QueueStateService; +import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto; import static org.thingsboard.server.utils.CalculatedFieldUtils.toProto; @@ -35,12 +41,7 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF @Autowired private ActorSystemContext actorSystemContext; - protected PartitionedQueueConsumerManager> eventConsumer; - - @Override - public void init(PartitionedQueueConsumerManager> eventConsumer) { - this.eventConsumer = eventConsumer; - } + protected QueueStateService, TbProtoQueueMsg> stateService; @Override public final void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { @@ -69,4 +70,24 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state)); } + @Override + public void restore(QueueKey queueKey, Set partitions) { + stateService.update(queueKey, partitions); + } + + @Override + public void delete(Set partitions) { + stateService.delete(partitions); + } + + @Override + public Set getPartitions() { + return stateService.getPartitions().values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); + } + + @Override + public void stop() { + stateService.stop(); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java index 109f13f183..d0b34f18e8 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java @@ -21,6 +21,7 @@ import org.thingsboard.server.exception.CalculatedFieldStateException; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; @@ -34,7 +35,11 @@ public interface CalculatedFieldStateService { void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback); - void restore(Set partitions); + void restore(QueueKey queueKey, Set partitions); + + void delete(Set partitions); + + Set getPartitions(); void stop(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 557768e9c9..76cd2cfcf3 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -30,12 +30,13 @@ import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; +import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgHeaders; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.common.state.KafkaQueueStateService; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; -import org.thingsboard.server.queue.common.consumer.QueueStateService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; @@ -43,10 +44,12 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.*; +import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToString; +import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToUuid; +import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.stringToBytes; +import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.uuidToBytes; @Service @RequiredArgsConstructor @@ -56,22 +59,19 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta private final TbRuleEngineQueueFactory queueFactory; private final PartitionService partitionService; + private final TbQueueAdmin queueAdmin; @Value("${queue.calculated_fields.poll_interval:25}") private long pollInterval; - private PartitionedQueueConsumerManager> stateConsumer; private TbKafkaProducerTemplate> stateProducer; - private QueueStateService, TbProtoQueueMsg> queueStateService; private final AtomicInteger counter = new AtomicInteger(); @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { - super.init(eventConsumer); - var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME); - this.stateConsumer = PartitionedQueueConsumerManager.>create() + PartitionedQueueConsumerManager> stateConsumer = PartitionedQueueConsumerManager.>create() .queueKey(queueKey) .topic(partitionService.getTopic(queueKey)) .pollInterval(pollInterval) @@ -94,13 +94,13 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta } }) .consumerCreator((config, partitionId) -> queueFactory.createCalculatedFieldStateConsumer()) + .queueAdmin(queueAdmin) .consumerExecutor(eventConsumer.getConsumerExecutor()) .scheduler(eventConsumer.getScheduler()) .taskExecutor(eventConsumer.getTaskExecutor()) .build(); + super.stateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer); this.stateProducer = (TbKafkaProducerTemplate>) queueFactory.createCalculatedFieldStateProducer(); - this.queueStateService = new QueueStateService<>(); - this.queueStateService.init(stateConsumer, super.eventConsumer); } @Override @@ -132,11 +132,6 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta doPersist(stateId, null, callback); } - @Override - public void restore(Set partitions) { - queueStateService.update(partitions); - } - private void putStateId(TbQueueMsgHeaders headers, CalculatedFieldEntityCtxId stateId) { headers.put("tenantId", uuidToBytes(stateId.tenantId().getId())); headers.put("cfId", uuidToBytes(stateId.cfId().getId())); @@ -153,8 +148,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override public void stop() { - stateConsumer.stop(); - stateConsumer.awaitStop(); + super.stop(); stateProducer.stop(); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java index a508eecada..0eaa506dfd 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java @@ -22,7 +22,12 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.common.state.DefaultQueueStateService; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService; import org.thingsboard.server.service.cf.CfRocksDb; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; @@ -37,7 +42,10 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS private final CfRocksDb cfRocksDb; - private boolean initialized; + @Override + public void init(PartitionedQueueConsumerManager> eventConsumer) { + super.stateService = new DefaultQueueStateService<>(eventConsumer); + } @Override protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) { @@ -52,8 +60,8 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS } @Override - public void restore(Set partitions) { - if (!this.initialized) { + public void restore(QueueKey queueKey, Set partitions) { + if (stateService.getPartitions().isEmpty()) { cfRocksDb.forEach((key, value) -> { try { processRestoredState(CalculatedFieldStateProto.parseFrom(value)); @@ -61,13 +69,8 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS log.error("[{}] Failed to process restored state", key, e); } }); - this.initialized = true; } - eventConsumer.update(partitions); - } - - @Override - public void stop() { + super.restore(queueKey, partitions); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 04aab6b39d..fe42684b2b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -18,24 +18,31 @@ package org.thingsboard.server.service.queue; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; +import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; @@ -54,6 +61,7 @@ import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -73,10 +81,9 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer private long packProcessingTimeout; private final TbRuleEngineQueueFactory queueFactory; + private final TbQueueAdmin queueAdmin; private final CalculatedFieldStateService stateService; - private PartitionedQueueConsumerManager> eventConsumer; - public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory, ActorSystemContext actorContext, TbDeviceProfileCache deviceProfileCache, @@ -87,10 +94,12 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer ApplicationEventPublisher eventPublisher, JwtSettingsService jwtSettingsService, CalculatedFieldCache calculatedFieldCache, + TbQueueAdmin queueAdmin, CalculatedFieldStateService stateService) { super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService); this.queueFactory = tbQueueFactory; + this.queueAdmin = queueAdmin; this.stateService = stateService; } @@ -99,12 +108,13 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer super.init("tb-cf"); var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); - this.eventConsumer = PartitionedQueueConsumerManager.>create() + PartitionedQueueConsumerManager> eventConsumer = PartitionedQueueConsumerManager.>create() .queueKey(queueKey) .topic(partitionService.getTopic(queueKey)) .pollInterval(pollInterval) .msgPackProcessor(this::processMsgs) .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer()) + .queueAdmin(queueAdmin) .consumerExecutor(consumersExecutor) .scheduler(scheduler) .taskExecutor(mgmtExecutor) @@ -124,9 +134,12 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { - var partitions = event.getCfPartitions(); try { - stateService.restore(partitions); + event.getNewPartitions().forEach((queueKey, partitions) -> { + if (queueKey.getQueueName().equals(DataConstants.CF_QUEUE_NAME)) { + stateService.restore(queueKey, partitions); + } + }); // eventConsumer's partitions will be updated by stateService // Cleanup old entities after corresponding consumers are stopped. @@ -212,6 +225,21 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer } } + @EventListener + public void handleComponentLifecycleEvent(ComponentLifecycleMsg event) { + if (event.getEntityId().getEntityType() == EntityType.TENANT) { + if (event.getEvent() == ComponentLifecycleEvent.DELETED) { + Set partitions = stateService.getPartitions(); + if (CollectionUtils.isEmpty(partitions)) { + return; + } + stateService.delete(partitions.stream() + .filter(tpi -> tpi.getTenantId().isPresent() && tpi.getTenantId().get().equals(event.getTenantId())) + .collect(Collectors.toSet())); + } + } + } + private void forwardToActorSystem(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) { var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); @@ -232,9 +260,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer @Override protected void stopConsumers() { super.stopConsumers(); - eventConsumer.stop(); - eventConsumer.awaitStop(); - stateService.stop(); + stateService.stop(); // eventConsumer will be stopped by stateService } } diff --git a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java index ce5221ab89..153ec2d26f 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java @@ -35,7 +35,7 @@ import static org.awaitility.Awaitility.await; @DaoSqlTest @TestPropertySource(properties = { // "queue.type=kafka", // uncomment to use Kafka -// "queue.kafka.bootstrap.servers=10.7.1.254:9092", +// "queue.kafka.bootstrap.servers=10.7.2.107:9092", "queue.edqs.sync.enabled=true", "queue.edqs.api.supported=true", "queue.edqs.api.auto_enable=true", diff --git a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java index 3f2434c4ec..4fd613dc1d 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java @@ -664,34 +664,38 @@ public class TenantControllerTest extends AbstractControllerTest { savedDifferentTenant.setTenantProfileId(tenantProfile.getId()); savedDifferentTenant = saveTenant(savedDifferentTenant); TenantId tenantId = differentTenantId; - await().atMost(TIMEOUT, TimeUnit.SECONDS) - .until(() -> { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId); - return !tpi.getTenantId().get().isSysTenantId(); - }); - TopicPartitionInfo tpi = new TopicPartitionInfo(MAIN_QUEUE_TOPIC, tenantId, 0, false); - String isolatedTopic = tpi.getFullTopicName(); - TbMsg expectedMsg = publishTbMsg(tenantId, tpi); + List isolatedTpis = await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> { + List newTpis = new ArrayList<>(); + newTpis.add(partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId)); + newTpis.add(partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, tenantId)); + return newTpis; + }, newTpis -> newTpis.stream().allMatch(newTpi -> newTpi.getTenantId().get().equals(tenantId))); + TbMsg expectedMsg = publishTbMsg(tenantId, isolatedTpis.get(0)); awaitTbMsg(tbMsg -> tbMsg.getId().equals(expectedMsg.getId()), 10000); // to wait for consumer start loginSysAdmin(); tenantProfile.setIsolatedTbRuleEngine(false); tenantProfile.getProfileData().setQueueConfiguration(Collections.emptyList()); tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class); - await().atMost(TIMEOUT, TimeUnit.SECONDS) - .until(() -> partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId) - .getTenantId().get().isSysTenantId()); + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + TopicPartitionInfo newTpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId); + assertThat(newTpi.getTenantId()).hasValue(TenantId.SYS_TENANT_ID); + newTpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, tenantId); + assertThat(newTpi.getTenantId()).hasValue(TenantId.SYS_TENANT_ID); + }); List submittedMsgs = new ArrayList<>(); long timeLeft = TimeUnit.SECONDS.toMillis(7); // based on topic-deletion-delay int msgs = 100; for (int i = 1; i <= msgs; i++) { - TbMsg tbMsg = publishTbMsg(tenantId, tpi); + TbMsg tbMsg = publishTbMsg(tenantId, isolatedTpis.get(0)); submittedMsgs.add(tbMsg.getId()); Thread.sleep(timeLeft / msgs); } await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { - verify(queueAdmin, times(1)).deleteTopic(eq(isolatedTopic)); + TopicPartitionInfo tpi = isolatedTpis.get(0); + // we only expect deletion of Rule Engine topic. for CF - the topic is left as is because queue draining is not supported + verify(queueAdmin, times(1)).deleteTopic(eq(tpi.getFullTopicName())); }); await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { @@ -719,12 +723,16 @@ public class TenantControllerTest extends AbstractControllerTest { savedDifferentTenant.setTenantProfileId(tenantProfile.getId()); savedDifferentTenant = saveTenant(savedDifferentTenant); TenantId tenantId = differentTenantId; - await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { - assertThat(partitionService.getMyPartitions(new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId))).isNotNull(); - }); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId); - assertThat(tpi.getTenantId()).hasValue(tenantId); - TbMsg tbMsg = publishTbMsg(tenantId, tpi); + List isolatedTpis = await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> { + List newTpis = new ArrayList<>(); + newTpis.add(partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId)); + newTpis.add(partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, tenantId)); + return newTpis; + }, newTpis -> newTpis.stream().allMatch(newTpi -> { + return newTpi.getTenantId().get().equals(tenantId) && + newTpi.isMyPartition(); + })); + TbMsg tbMsg = publishTbMsg(tenantId, isolatedTpis.get(0)); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { verify(actorContext).tell(argThat(msg -> { return msg instanceof QueueToRuleEngineMsg && ((QueueToRuleEngineMsg) msg).getMsg().getId().equals(tbMsg.getId()); @@ -738,7 +746,9 @@ public class TenantControllerTest extends AbstractControllerTest { assertThatThrownBy(() -> partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId)) .isInstanceOf(TenantNotFoundException.class); - verify(queueAdmin).deleteTopic(eq(tpi.getFullTopicName())); + isolatedTpis.forEach(tpi -> { + verify(queueAdmin).deleteTopic(eq(tpi.getFullTopicName())); + }); }); } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index c5696c08f7..e18c8171af 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -53,6 +53,7 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; +import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueHandler; import org.thingsboard.server.queue.TbQueueResponseTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -91,6 +92,7 @@ public class EdqsProcessor implements TbQueueHandler, private final EdqsPartitionService partitionService; private final ConfigurableApplicationContext applicationContext; private final EdqsStateService stateService; + private final TbQueueAdmin queueAdmin; private PartitionedQueueConsumerManager> eventConsumer; private TbQueueResponseTemplate, TbProtoQueueMsg> responseTemplate; @@ -141,6 +143,7 @@ public class EdqsProcessor implements TbQueueHandler, consumer.commit(); }) .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS)) + .queueAdmin(queueAdmin) .consumerExecutor(consumersExecutor) .taskExecutor(taskExecutor) .scheduler(scheduler) diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 80b6eebf5c..6d938d2976 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -30,10 +30,12 @@ import org.thingsboard.server.edqs.processor.EdqsProducer; import org.thingsboard.server.edqs.util.VersionsStore; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; +import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; -import org.thingsboard.server.queue.common.consumer.QueueStateService; +import org.thingsboard.server.queue.common.state.KafkaQueueStateService; +import org.thingsboard.server.queue.common.state.QueueStateService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; @@ -54,6 +56,7 @@ public class KafkaEdqsStateService implements EdqsStateService { private final EdqsConfig config; private final EdqsPartitionService partitionService; private final EdqsQueueFactory queueFactory; + private final TbQueueAdmin queueAdmin; private final TopicService topicService; @Autowired @Lazy private EdqsProcessor edqsProcessor; @@ -89,13 +92,13 @@ public class KafkaEdqsStateService implements EdqsStateService { consumer.commit(); }) .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.STATE)) + .queueAdmin(queueAdmin) .consumerExecutor(eventConsumer.getConsumerExecutor()) .taskExecutor(eventConsumer.getTaskExecutor()) .scheduler(eventConsumer.getScheduler()) .uncaughtErrorHandler(edqsProcessor.getErrorHandler()) .build(); - queueStateService = new QueueStateService<>(); - queueStateService.init(stateConsumer, eventConsumer); + queueStateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer); eventsToBackupConsumer = QueueConsumerManager.>builder() .name("edqs-events-to-backup-consumer") @@ -149,11 +152,11 @@ public class KafkaEdqsStateService implements EdqsStateService { @Override public void process(Set partitions) { - if (queueStateService.getPartitions() == null) { + if (queueStateService.getPartitions().isEmpty()) { eventsToBackupConsumer.subscribe(); eventsToBackupConsumer.launch(); } - queueStateService.update(partitions); + queueStateService.update(new QueueKey(ServiceType.EDQS), partitions); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java index 1d68cc9c4e..f25a98adf4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java @@ -20,9 +20,11 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.AddPartitionsTask; +import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.DeletePartitionsTask; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.RemovePartitionsTask; import org.thingsboard.server.queue.discovery.QueueKey; @@ -36,17 +38,19 @@ import java.util.function.Consumer; public class PartitionedQueueConsumerManager extends MainQueueConsumerManager { private final ConsumerPerPartitionWrapper consumerWrapper; + private final TbQueueAdmin queueAdmin; @Getter private final String topic; @Builder(builderMethodName = "create") // not to conflict with super.builder() public PartitionedQueueConsumerManager(QueueKey queueKey, String topic, long pollInterval, MsgPackProcessor msgPackProcessor, - BiFunction> consumerCreator, + BiFunction> consumerCreator, TbQueueAdmin queueAdmin, ExecutorService consumerExecutor, ScheduledExecutorService scheduler, ExecutorService taskExecutor, Consumer uncaughtErrorHandler) { super(queueKey, QueueConfig.of(true, pollInterval), msgPackProcessor, consumerCreator, consumerExecutor, scheduler, taskExecutor, uncaughtErrorHandler); this.topic = topic; this.consumerWrapper = (ConsumerPerPartitionWrapper) super.consumerWrapper; + this.queueAdmin = queueAdmin; } @Override @@ -57,6 +61,17 @@ public class PartitionedQueueConsumerManager extends MainQ } else if (task instanceof RemovePartitionsTask removePartitionsTask) { log.info("[{}] Removed partitions: {}", queueKey, removePartitionsTask.partitions()); consumerWrapper.removePartitions(removePartitionsTask.partitions()); + } else if (task instanceof DeletePartitionsTask deletePartitionsTask) { + log.info("[{}] Removing partitions and deleting topics: {}", queueKey, deletePartitionsTask.partitions()); + consumerWrapper.removePartitions(deletePartitionsTask.partitions()); + deletePartitionsTask.partitions().forEach(tpi -> { + String topic = tpi.getFullTopicName(); + try { + queueAdmin.deleteTopic(topic); + } catch (Throwable t) { + log.error("Failed to delete topic {}", topic, t); + } + }); } } @@ -72,4 +87,8 @@ public class PartitionedQueueConsumerManager extends MainQ addTask(new RemovePartitionsTask(partitions)); } + public void delete(Set partitions) { + addTask(new DeletePartitionsTask(partitions)); + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java deleted file mode 100644 index 8870ff2a2c..0000000000 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.queue.common.consumer; - -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.queue.TbQueueMsg; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic; - -@Slf4j -public class QueueStateService { - - private PartitionedQueueConsumerManager stateConsumer; - private PartitionedQueueConsumerManager eventConsumer; - - @Getter - private Set partitions; - private final Set partitionsInProgress = ConcurrentHashMap.newKeySet(); - private boolean initialized; - - private final ReadWriteLock partitionsLock = new ReentrantReadWriteLock(); - - public void init(PartitionedQueueConsumerManager stateConsumer, PartitionedQueueConsumerManager eventConsumer) { - this.stateConsumer = stateConsumer; - this.eventConsumer = eventConsumer; - } - - public void update(Set newPartitions) { - newPartitions = withTopic(newPartitions, stateConsumer.getTopic()); - var writeLock = partitionsLock.writeLock(); - writeLock.lock(); - Set oldPartitions = this.partitions != null ? this.partitions : Collections.emptySet(); - Set addedPartitions; - Set removedPartitions; - try { - addedPartitions = new HashSet<>(newPartitions); - addedPartitions.removeAll(oldPartitions); - removedPartitions = new HashSet<>(oldPartitions); - removedPartitions.removeAll(newPartitions); - this.partitions = newPartitions; - } finally { - writeLock.unlock(); - } - - if (!removedPartitions.isEmpty()) { - stateConsumer.removePartitions(removedPartitions); - eventConsumer.removePartitions(withTopic(removedPartitions, eventConsumer.getTopic())); - } - - if (!addedPartitions.isEmpty()) { - partitionsInProgress.addAll(addedPartitions); - stateConsumer.addPartitions(addedPartitions, partition -> { - var readLock = partitionsLock.readLock(); - readLock.lock(); - try { - partitionsInProgress.remove(partition); - log.info("Finished partition {} (still in progress: {})", partition, partitionsInProgress); - if (partitionsInProgress.isEmpty()) { - log.info("All partitions processed"); - } - if (this.partitions.contains(partition)) { - eventConsumer.addPartitions(Set.of(partition.withTopic(eventConsumer.getTopic()))); - } - } finally { - readLock.unlock(); - } - }); - } - initialized = true; - } - - public Set getPartitionsInProgress() { - return initialized ? partitionsInProgress : null; - } - -} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueTaskType.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueTaskType.java index 93601146e9..84cb3c9382 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueTaskType.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueTaskType.java @@ -20,6 +20,6 @@ import java.io.Serializable; public enum QueueTaskType implements Serializable { UPDATE_PARTITIONS, UPDATE_CONFIG, DELETE, - ADD_PARTITIONS, REMOVE_PARTITIONS + ADD_PARTITIONS, REMOVE_PARTITIONS, DELETE_PARTITIONS } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java index 3380dd7e31..e0dd9b808b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java @@ -60,4 +60,11 @@ public interface TbQueueConsumerManagerTask { } } + record DeletePartitionsTask(Set partitions) implements TbQueueConsumerManagerTask { + @Override + public QueueTaskType getType() { + return QueueTaskType.REMOVE_PARTITIONS; + } + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java new file mode 100644 index 0000000000..be019caaa7 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java @@ -0,0 +1,27 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common.state; + +import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; + +public class DefaultQueueStateService extends QueueStateService { + + public DefaultQueueStateService(PartitionedQueueConsumerManager eventConsumer) { + super(eventConsumer); + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java new file mode 100644 index 0000000000..9adc6bb996 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java @@ -0,0 +1,81 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common.state; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.discovery.QueueKey; + +import java.util.Set; + +import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic; + +@Slf4j +public class KafkaQueueStateService extends QueueStateService { + + private final PartitionedQueueConsumerManager stateConsumer; + + public KafkaQueueStateService(PartitionedQueueConsumerManager eventConsumer, PartitionedQueueConsumerManager stateConsumer) { + super(eventConsumer); + this.stateConsumer = stateConsumer; + } + + @Override + protected void addPartitions(QueueKey queueKey, Set partitions) { + Set statePartitions = withTopic(partitions, stateConsumer.getTopic()); + partitionsInProgress.addAll(statePartitions); + stateConsumer.addPartitions(statePartitions, statePartition -> { + var readLock = partitionsLock.readLock(); + readLock.lock(); + try { + partitionsInProgress.remove(statePartition); + log.info("Finished partition {} (still in progress: {})", statePartition, partitionsInProgress); + if (partitionsInProgress.isEmpty()) { + log.info("All partitions processed"); + } + + TopicPartitionInfo eventPartition = statePartition.withTopic(eventConsumer.getTopic()); + if (this.partitions.get(queueKey).contains(eventPartition)) { + eventConsumer.addPartitions(Set.of(eventPartition)); + } + } finally { + readLock.unlock(); + } + }); + } + + @Override + protected void removePartitions(QueueKey queueKey, Set partitions) { + super.removePartitions(queueKey, partitions); + stateConsumer.removePartitions(withTopic(partitions, stateConsumer.getTopic())); + } + + @Override + protected void deletePartitions(Set partitions) { + super.deletePartitions(partitions); + stateConsumer.delete(withTopic(partitions, stateConsumer.getTopic())); + } + + @Override + public void stop() { + super.stop(); + stateConsumer.stop(); + stateConsumer.awaitStop(); + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java new file mode 100644 index 0000000000..29426fab63 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java @@ -0,0 +1,114 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common.state; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.discovery.QueueKey; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic; + +@Slf4j +public abstract class QueueStateService { + + protected final PartitionedQueueConsumerManager eventConsumer; + + @Getter + protected final Map> partitions = new HashMap<>(); + protected final Set partitionsInProgress = ConcurrentHashMap.newKeySet(); + protected boolean initialized; + + protected final ReadWriteLock partitionsLock = new ReentrantReadWriteLock(); + + protected QueueStateService(PartitionedQueueConsumerManager eventConsumer) { + this.eventConsumer = eventConsumer; + } + + public void update(QueueKey queueKey, Set newPartitions) { + newPartitions = withTopic(newPartitions, eventConsumer.getTopic()); + var writeLock = partitionsLock.writeLock(); + writeLock.lock(); + Set oldPartitions = this.partitions.getOrDefault(queueKey, Collections.emptySet()); + Set addedPartitions; + Set removedPartitions; + try { + addedPartitions = new HashSet<>(newPartitions); + addedPartitions.removeAll(oldPartitions); + removedPartitions = new HashSet<>(oldPartitions); + removedPartitions.removeAll(newPartitions); + this.partitions.put(queueKey, newPartitions); + } finally { + writeLock.unlock(); + } + + if (!removedPartitions.isEmpty()) { + removePartitions(queueKey, removedPartitions); + } + + if (!addedPartitions.isEmpty()) { + addPartitions(queueKey, addedPartitions); + } + initialized = true; + } + + protected void addPartitions(QueueKey queueKey, Set partitions) { + eventConsumer.addPartitions(partitions); + } + + protected void removePartitions(QueueKey queueKey, Set partitions) { + eventConsumer.removePartitions(partitions); + } + + public void delete(Set partitions) { + if (partitions.isEmpty()) { + return; + } + var writeLock = partitionsLock.writeLock(); + writeLock.lock(); + try { + this.partitions.values().forEach(tpis -> tpis.removeAll(partitions)); + } finally { + writeLock.unlock(); + } + deletePartitions(partitions); + } + + protected void deletePartitions(Set partitions) { + eventConsumer.delete(withTopic(partitions, eventConsumer.getTopic())); + } + + public Set getPartitionsInProgress() { + return initialized ? partitionsInProgress : null; + } + + public void stop() { + eventConsumer.stop(); + eventConsumer.awaitStop(); + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 345b44e764..3a76d50825 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -52,7 +52,10 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; +import java.util.stream.Stream; +import static org.thingsboard.server.common.data.DataConstants.CF_QUEUE_NAME; +import static org.thingsboard.server.common.data.DataConstants.CF_STATES_QUEUE_NAME; import static org.thingsboard.server.common.data.DataConstants.EDGE_QUEUE_NAME; import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME; @@ -159,16 +162,7 @@ public class HashPartitionService implements PartitionService { List queueRoutingInfoList = getQueueRoutingInfos(); queueRoutingInfoList.forEach(queue -> { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue); - if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) { - QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME); - partitionSizesMap.put(cfQueueKey, queue.getPartitions()); - partitionTopicsMap.put(cfQueueKey, cfEventTopic); - QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME); - partitionSizesMap.put(cfQueueStatesKey, queue.getPartitions()); - partitionTopicsMap.put(cfQueueStatesKey, cfStateTopic); - } - partitionTopicsMap.put(queueKey, queue.getQueueTopic()); - partitionSizesMap.put(queueKey, queue.getPartitions()); + updateQueue(queueKey, queue.getQueueTopic(), queue.getPartitions()); queueConfigs.put(queueKey, new QueueConfig(queue)); }); } @@ -215,16 +209,7 @@ public class HashPartitionService implements PartitionService { QueueRoutingInfo queueRoutingInfo = new QueueRoutingInfo(queueUpdateMsg); TenantId tenantId = queueRoutingInfo.getTenantId(); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueRoutingInfo.getQueueName(), tenantId); - if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) { - QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME); - partitionSizesMap.put(cfQueueKey, queueRoutingInfo.getPartitions()); - partitionTopicsMap.put(cfQueueKey, cfEventTopic); - QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME); - partitionSizesMap.put(cfQueueStatesKey, queueRoutingInfo.getPartitions()); - partitionTopicsMap.put(cfQueueStatesKey, cfStateTopic); - } - partitionTopicsMap.put(queueKey, queueRoutingInfo.getQueueTopic()); - partitionSizesMap.put(queueKey, queueRoutingInfo.getPartitions()); + updateQueue(queueKey, queueRoutingInfo.getQueueTopic(), queueRoutingInfo.getPartitions()); queueConfigs.put(queueKey, new QueueConfig(queueRoutingInfo)); if (!tenantId.isSysTenantId()) { tenantRoutingInfoMap.remove(tenantId); @@ -235,9 +220,15 @@ public class HashPartitionService implements PartitionService { @Override public void removeQueues(List queueDeleteMsgs) { List queueKeys = queueDeleteMsgs.stream() - .map(queueDeleteMsg -> { + .flatMap(queueDeleteMsg -> { TenantId tenantId = TenantId.fromUUID(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); - return new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); + if (queueKey.getQueueName().equals(MAIN_QUEUE_NAME)) { + return Stream.of(queueKey, queueKey.withQueueName(CF_QUEUE_NAME), + queueKey.withQueueName(CF_STATES_QUEUE_NAME)); + } else { + return Stream.of(queueKey); + } }).toList(); queueKeys.forEach(queueKey -> { removeQueue(queueKey); @@ -252,25 +243,38 @@ public class HashPartitionService implements PartitionService { @Override public void removeTenant(TenantId tenantId) { List queueKeys = partitionSizesMap.keySet().stream() - .filter(queueKey -> tenantId.equals(queueKey.getTenantId())).toList(); + .filter(queueKey -> tenantId.equals(queueKey.getTenantId())) + .flatMap(queueKey -> { + if (queueKey.getQueueName().equals(MAIN_QUEUE_NAME)) { + return Stream.of(queueKey, queueKey.withQueueName(CF_QUEUE_NAME), + queueKey.withQueueName(CF_STATES_QUEUE_NAME)); + } else { + return Stream.of(queueKey); + } + }) + .toList(); queueKeys.forEach(this::removeQueue); evictTenantInfo(tenantId); } + private void updateQueue(QueueKey queueKey, String topic, int partitions) { + partitionTopicsMap.put(queueKey, topic); + partitionSizesMap.put(queueKey, partitions); + if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) { + QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME); + partitionTopicsMap.put(cfQueueKey, cfEventTopic); + partitionSizesMap.put(cfQueueKey, partitions); + QueueKey cfStatesQueueKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME); + partitionTopicsMap.put(cfStatesQueueKey, cfStateTopic); + partitionSizesMap.put(cfStatesQueueKey, partitions); + } + } + private void removeQueue(QueueKey queueKey) { myPartitions.remove(queueKey); partitionTopicsMap.remove(queueKey); partitionSizesMap.remove(queueKey); queueConfigs.remove(queueKey); - - if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) { - QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME); - partitionSizesMap.remove(cfQueueKey); - partitionTopicsMap.remove(cfQueueKey); - QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME); - partitionSizesMap.remove(cfQueueStatesKey); - partitionTopicsMap.remove(cfQueueStatesKey); - } } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java index 6720a9d71e..1709003ada 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java @@ -23,9 +23,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; -import static org.thingsboard.server.common.data.DataConstants.CF_QUEUE_NAME; -import static org.thingsboard.server.common.data.DataConstants.CF_STATES_QUEUE_NAME; - @Data @AllArgsConstructor public class QueueKey { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index f393a27ddf..4ae744be67 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -91,7 +91,7 @@ public class TbKafkaAdmin implements TbQueueAdmin { @Override public void deleteTopic(String topic) { Set topics = getTopics(); - if (topics.contains(topic)) { + if (topics.remove(topic)) { settings.getAdminClient().deleteTopics(Collections.singletonList(topic)); } else { try {