Fixes for CF consumers and repartitioning; refactoring

This commit is contained in:
ViacheslavKlimov 2025-03-17 16:30:31 +02:00
parent b40fa86bac
commit e97accadab
19 changed files with 420 additions and 204 deletions

View File

@ -19,14 +19,20 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.exception.CalculatedFieldStateException;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.state.QueueStateService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto;
import static org.thingsboard.server.utils.CalculatedFieldUtils.toProto;
@ -35,12 +41,7 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF
@Autowired
private ActorSystemContext actorSystemContext;
protected PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer;
@Override
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) {
this.eventConsumer = eventConsumer;
}
protected QueueStateService<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>> stateService;
@Override
public final void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
@ -69,4 +70,24 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF
actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state));
}
@Override
public void restore(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
stateService.update(queueKey, partitions);
}
@Override
public void delete(Set<TopicPartitionInfo> partitions) {
stateService.delete(partitions);
}
@Override
public Set<TopicPartitionInfo> getPartitions() {
return stateService.getPartitions().values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
}
@Override
public void stop() {
stateService.stop();
}
}

View File

@ -21,6 +21,7 @@ import org.thingsboard.server.exception.CalculatedFieldStateException;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
@ -34,7 +35,11 @@ public interface CalculatedFieldStateService {
void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback);
void restore(Set<TopicPartitionInfo> partitions);
void restore(QueueKey queueKey, Set<TopicPartitionInfo> partitions);
void delete(Set<TopicPartitionInfo> partitions);
Set<TopicPartitionInfo> getPartitions();
void stop();

View File

@ -30,12 +30,13 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgHeaders;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.QueueStateService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
@ -43,10 +44,12 @@ import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.*;
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToString;
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToUuid;
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.stringToBytes;
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.uuidToBytes;
@Service
@RequiredArgsConstructor
@ -56,22 +59,19 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
private final TbRuleEngineQueueFactory queueFactory;
private final PartitionService partitionService;
private final TbQueueAdmin queueAdmin;
@Value("${queue.calculated_fields.poll_interval:25}")
private long pollInterval;
private PartitionedQueueConsumerManager<TbProtoQueueMsg<CalculatedFieldStateProto>> stateConsumer;
private TbKafkaProducerTemplate<TbProtoQueueMsg<CalculatedFieldStateProto>> stateProducer;
private QueueStateService<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>> queueStateService;
private final AtomicInteger counter = new AtomicInteger();
@Override
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) {
super.init(eventConsumer);
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME);
this.stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<CalculatedFieldStateProto>>create()
PartitionedQueueConsumerManager<TbProtoQueueMsg<CalculatedFieldStateProto>> stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<CalculatedFieldStateProto>>create()
.queueKey(queueKey)
.topic(partitionService.getTopic(queueKey))
.pollInterval(pollInterval)
@ -94,13 +94,13 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
}
})
.consumerCreator((config, partitionId) -> queueFactory.createCalculatedFieldStateConsumer())
.queueAdmin(queueAdmin)
.consumerExecutor(eventConsumer.getConsumerExecutor())
.scheduler(eventConsumer.getScheduler())
.taskExecutor(eventConsumer.getTaskExecutor())
.build();
super.stateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer);
this.stateProducer = (TbKafkaProducerTemplate<TbProtoQueueMsg<CalculatedFieldStateProto>>) queueFactory.createCalculatedFieldStateProducer();
this.queueStateService = new QueueStateService<>();
this.queueStateService.init(stateConsumer, super.eventConsumer);
}
@Override
@ -132,11 +132,6 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
doPersist(stateId, null, callback);
}
@Override
public void restore(Set<TopicPartitionInfo> partitions) {
queueStateService.update(partitions);
}
private void putStateId(TbQueueMsgHeaders headers, CalculatedFieldEntityCtxId stateId) {
headers.put("tenantId", uuidToBytes(stateId.tenantId().getId()));
headers.put("cfId", uuidToBytes(stateId.cfId().getId()));
@ -153,8 +148,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
@Override
public void stop() {
stateConsumer.stop();
stateConsumer.awaitStop();
super.stop();
stateProducer.stop();
}

View File

@ -22,7 +22,12 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.common.state.DefaultQueueStateService;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService;
import org.thingsboard.server.service.cf.CfRocksDb;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
@ -37,7 +42,10 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS
private final CfRocksDb cfRocksDb;
private boolean initialized;
@Override
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) {
super.stateService = new DefaultQueueStateService<>(eventConsumer);
}
@Override
protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) {
@ -52,8 +60,8 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS
}
@Override
public void restore(Set<TopicPartitionInfo> partitions) {
if (!this.initialized) {
public void restore(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
if (stateService.getPartitions().isEmpty()) {
cfRocksDb.forEach((key, value) -> {
try {
processRestoredState(CalculatedFieldStateProto.parseFrom(value));
@ -61,13 +69,8 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS
log.error("[{}] Failed to process restored state", key, e);
}
});
this.initialized = true;
}
eventConsumer.update(partitions);
}
@Override
public void stop() {
super.restore(queueKey, partitions);
}
}

View File

@ -18,24 +18,31 @@ package org.thingsboard.server.service.queue;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
@ -54,6 +61,7 @@ import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -73,10 +81,9 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
private long packProcessingTimeout;
private final TbRuleEngineQueueFactory queueFactory;
private final TbQueueAdmin queueAdmin;
private final CalculatedFieldStateService stateService;
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer;
public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory,
ActorSystemContext actorContext,
TbDeviceProfileCache deviceProfileCache,
@ -87,10 +94,12 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
ApplicationEventPublisher eventPublisher,
JwtSettingsService jwtSettingsService,
CalculatedFieldCache calculatedFieldCache,
TbQueueAdmin queueAdmin,
CalculatedFieldStateService stateService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService,
eventPublisher, jwtSettingsService);
this.queueFactory = tbQueueFactory;
this.queueAdmin = queueAdmin;
this.stateService = stateService;
}
@ -99,12 +108,13 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
super.init("tb-cf");
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
this.eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create()
PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create()
.queueKey(queueKey)
.topic(partitionService.getTopic(queueKey))
.pollInterval(pollInterval)
.msgPackProcessor(this::processMsgs)
.consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer())
.queueAdmin(queueAdmin)
.consumerExecutor(consumersExecutor)
.scheduler(scheduler)
.taskExecutor(mgmtExecutor)
@ -124,9 +134,12 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
var partitions = event.getCfPartitions();
try {
stateService.restore(partitions);
event.getNewPartitions().forEach((queueKey, partitions) -> {
if (queueKey.getQueueName().equals(DataConstants.CF_QUEUE_NAME)) {
stateService.restore(queueKey, partitions);
}
});
// eventConsumer's partitions will be updated by stateService
// Cleanup old entities after corresponding consumers are stopped.
@ -212,6 +225,21 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
}
}
@EventListener
public void handleComponentLifecycleEvent(ComponentLifecycleMsg event) {
if (event.getEntityId().getEntityType() == EntityType.TENANT) {
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
Set<TopicPartitionInfo> partitions = stateService.getPartitions();
if (CollectionUtils.isEmpty(partitions)) {
return;
}
stateService.delete(partitions.stream()
.filter(tpi -> tpi.getTenantId().isPresent() && tpi.getTenantId().get().equals(event.getTenantId()))
.collect(Collectors.toSet()));
}
}
}
private void forwardToActorSystem(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) {
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
@ -232,9 +260,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
@Override
protected void stopConsumers() {
super.stopConsumers();
eventConsumer.stop();
eventConsumer.awaitStop();
stateService.stop();
stateService.stop(); // eventConsumer will be stopped by stateService
}
}

View File

@ -35,7 +35,7 @@ import static org.awaitility.Awaitility.await;
@DaoSqlTest
@TestPropertySource(properties = {
// "queue.type=kafka", // uncomment to use Kafka
// "queue.kafka.bootstrap.servers=10.7.1.254:9092",
// "queue.kafka.bootstrap.servers=10.7.2.107:9092",
"queue.edqs.sync.enabled=true",
"queue.edqs.api.supported=true",
"queue.edqs.api.auto_enable=true",

View File

@ -664,34 +664,38 @@ public class TenantControllerTest extends AbstractControllerTest {
savedDifferentTenant.setTenantProfileId(tenantProfile.getId());
savedDifferentTenant = saveTenant(savedDifferentTenant);
TenantId tenantId = differentTenantId;
await().atMost(TIMEOUT, TimeUnit.SECONDS)
.until(() -> {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId);
return !tpi.getTenantId().get().isSysTenantId();
});
TopicPartitionInfo tpi = new TopicPartitionInfo(MAIN_QUEUE_TOPIC, tenantId, 0, false);
String isolatedTopic = tpi.getFullTopicName();
TbMsg expectedMsg = publishTbMsg(tenantId, tpi);
List<TopicPartitionInfo> isolatedTpis = await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> {
List<TopicPartitionInfo> newTpis = new ArrayList<>();
newTpis.add(partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId));
newTpis.add(partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, tenantId));
return newTpis;
}, newTpis -> newTpis.stream().allMatch(newTpi -> newTpi.getTenantId().get().equals(tenantId)));
TbMsg expectedMsg = publishTbMsg(tenantId, isolatedTpis.get(0));
awaitTbMsg(tbMsg -> tbMsg.getId().equals(expectedMsg.getId()), 10000); // to wait for consumer start
loginSysAdmin();
tenantProfile.setIsolatedTbRuleEngine(false);
tenantProfile.getProfileData().setQueueConfiguration(Collections.emptyList());
tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
await().atMost(TIMEOUT, TimeUnit.SECONDS)
.until(() -> partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId)
.getTenantId().get().isSysTenantId());
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
TopicPartitionInfo newTpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId);
assertThat(newTpi.getTenantId()).hasValue(TenantId.SYS_TENANT_ID);
newTpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, tenantId);
assertThat(newTpi.getTenantId()).hasValue(TenantId.SYS_TENANT_ID);
});
List<UUID> submittedMsgs = new ArrayList<>();
long timeLeft = TimeUnit.SECONDS.toMillis(7); // based on topic-deletion-delay
int msgs = 100;
for (int i = 1; i <= msgs; i++) {
TbMsg tbMsg = publishTbMsg(tenantId, tpi);
TbMsg tbMsg = publishTbMsg(tenantId, isolatedTpis.get(0));
submittedMsgs.add(tbMsg.getId());
Thread.sleep(timeLeft / msgs);
}
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
verify(queueAdmin, times(1)).deleteTopic(eq(isolatedTopic));
TopicPartitionInfo tpi = isolatedTpis.get(0);
// we only expect deletion of Rule Engine topic. for CF - the topic is left as is because queue draining is not supported
verify(queueAdmin, times(1)).deleteTopic(eq(tpi.getFullTopicName()));
});
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
@ -719,12 +723,16 @@ public class TenantControllerTest extends AbstractControllerTest {
savedDifferentTenant.setTenantProfileId(tenantProfile.getId());
savedDifferentTenant = saveTenant(savedDifferentTenant);
TenantId tenantId = differentTenantId;
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(partitionService.getMyPartitions(new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId))).isNotNull();
});
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId);
assertThat(tpi.getTenantId()).hasValue(tenantId);
TbMsg tbMsg = publishTbMsg(tenantId, tpi);
List<TopicPartitionInfo> isolatedTpis = await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> {
List<TopicPartitionInfo> newTpis = new ArrayList<>();
newTpis.add(partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId));
newTpis.add(partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, tenantId));
return newTpis;
}, newTpis -> newTpis.stream().allMatch(newTpi -> {
return newTpi.getTenantId().get().equals(tenantId) &&
newTpi.isMyPartition();
}));
TbMsg tbMsg = publishTbMsg(tenantId, isolatedTpis.get(0));
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
verify(actorContext).tell(argThat(msg -> {
return msg instanceof QueueToRuleEngineMsg && ((QueueToRuleEngineMsg) msg).getMsg().getId().equals(tbMsg.getId());
@ -738,7 +746,9 @@ public class TenantControllerTest extends AbstractControllerTest {
assertThatThrownBy(() -> partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId))
.isInstanceOf(TenantNotFoundException.class);
verify(queueAdmin).deleteTopic(eq(tpi.getFullTopicName()));
isolatedTpis.forEach(tpi -> {
verify(queueAdmin).deleteTopic(eq(tpi.getFullTopicName()));
});
});
}

View File

@ -53,6 +53,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -91,6 +92,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
private final EdqsPartitionService partitionService;
private final ConfigurableApplicationContext applicationContext;
private final EdqsStateService stateService;
private final TbQueueAdmin queueAdmin;
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer;
private TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> responseTemplate;
@ -141,6 +143,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
consumer.commit();
})
.consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS))
.queueAdmin(queueAdmin)
.consumerExecutor(consumersExecutor)
.taskExecutor(taskExecutor)
.scheduler(scheduler)

View File

@ -30,10 +30,12 @@ import org.thingsboard.server.edqs.processor.EdqsProducer;
import org.thingsboard.server.edqs.util.VersionsStore;
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.QueueStateService;
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
import org.thingsboard.server.queue.common.state.QueueStateService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
@ -54,6 +56,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
private final EdqsConfig config;
private final EdqsPartitionService partitionService;
private final EdqsQueueFactory queueFactory;
private final TbQueueAdmin queueAdmin;
private final TopicService topicService;
@Autowired @Lazy
private EdqsProcessor edqsProcessor;
@ -89,13 +92,13 @@ public class KafkaEdqsStateService implements EdqsStateService {
consumer.commit();
})
.consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.STATE))
.queueAdmin(queueAdmin)
.consumerExecutor(eventConsumer.getConsumerExecutor())
.taskExecutor(eventConsumer.getTaskExecutor())
.scheduler(eventConsumer.getScheduler())
.uncaughtErrorHandler(edqsProcessor.getErrorHandler())
.build();
queueStateService = new QueueStateService<>();
queueStateService.init(stateConsumer, eventConsumer);
queueStateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer);
eventsToBackupConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.name("edqs-events-to-backup-consumer")
@ -149,11 +152,11 @@ public class KafkaEdqsStateService implements EdqsStateService {
@Override
public void process(Set<TopicPartitionInfo> partitions) {
if (queueStateService.getPartitions() == null) {
if (queueStateService.getPartitions().isEmpty()) {
eventsToBackupConsumer.subscribe();
eventsToBackupConsumer.launch();
}
queueStateService.update(partitions);
queueStateService.update(new QueueKey(ServiceType.EDQS), partitions);
}
@Override

View File

@ -20,9 +20,11 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.AddPartitionsTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.DeletePartitionsTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.RemovePartitionsTask;
import org.thingsboard.server.queue.discovery.QueueKey;
@ -36,17 +38,19 @@ import java.util.function.Consumer;
public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQueueConsumerManager<M, QueueConfig> {
private final ConsumerPerPartitionWrapper consumerWrapper;
private final TbQueueAdmin queueAdmin;
@Getter
private final String topic;
@Builder(builderMethodName = "create") // not to conflict with super.builder()
public PartitionedQueueConsumerManager(QueueKey queueKey, String topic, long pollInterval, MsgPackProcessor<M, QueueConfig> msgPackProcessor,
BiFunction<QueueConfig, Integer, TbQueueConsumer<M>> consumerCreator,
BiFunction<QueueConfig, Integer, TbQueueConsumer<M>> consumerCreator, TbQueueAdmin queueAdmin,
ExecutorService consumerExecutor, ScheduledExecutorService scheduler,
ExecutorService taskExecutor, Consumer<Throwable> uncaughtErrorHandler) {
super(queueKey, QueueConfig.of(true, pollInterval), msgPackProcessor, consumerCreator, consumerExecutor, scheduler, taskExecutor, uncaughtErrorHandler);
this.topic = topic;
this.consumerWrapper = (ConsumerPerPartitionWrapper) super.consumerWrapper;
this.queueAdmin = queueAdmin;
}
@Override
@ -57,6 +61,17 @@ public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQ
} else if (task instanceof RemovePartitionsTask removePartitionsTask) {
log.info("[{}] Removed partitions: {}", queueKey, removePartitionsTask.partitions());
consumerWrapper.removePartitions(removePartitionsTask.partitions());
} else if (task instanceof DeletePartitionsTask deletePartitionsTask) {
log.info("[{}] Removing partitions and deleting topics: {}", queueKey, deletePartitionsTask.partitions());
consumerWrapper.removePartitions(deletePartitionsTask.partitions());
deletePartitionsTask.partitions().forEach(tpi -> {
String topic = tpi.getFullTopicName();
try {
queueAdmin.deleteTopic(topic);
} catch (Throwable t) {
log.error("Failed to delete topic {}", topic, t);
}
});
}
}
@ -72,4 +87,8 @@ public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQ
addTask(new RemovePartitionsTask(partitions));
}
public void delete(Set<TopicPartitionInfo> partitions) {
addTask(new DeletePartitionsTask(partitions));
}
}

View File

@ -1,98 +0,0 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.common.consumer;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueMsg;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic;
@Slf4j
public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
private PartitionedQueueConsumerManager<S> stateConsumer;
private PartitionedQueueConsumerManager<E> eventConsumer;
@Getter
private Set<TopicPartitionInfo> partitions;
private final Set<TopicPartitionInfo> partitionsInProgress = ConcurrentHashMap.newKeySet();
private boolean initialized;
private final ReadWriteLock partitionsLock = new ReentrantReadWriteLock();
public void init(PartitionedQueueConsumerManager<S> stateConsumer, PartitionedQueueConsumerManager<E> eventConsumer) {
this.stateConsumer = stateConsumer;
this.eventConsumer = eventConsumer;
}
public void update(Set<TopicPartitionInfo> newPartitions) {
newPartitions = withTopic(newPartitions, stateConsumer.getTopic());
var writeLock = partitionsLock.writeLock();
writeLock.lock();
Set<TopicPartitionInfo> oldPartitions = this.partitions != null ? this.partitions : Collections.emptySet();
Set<TopicPartitionInfo> addedPartitions;
Set<TopicPartitionInfo> removedPartitions;
try {
addedPartitions = new HashSet<>(newPartitions);
addedPartitions.removeAll(oldPartitions);
removedPartitions = new HashSet<>(oldPartitions);
removedPartitions.removeAll(newPartitions);
this.partitions = newPartitions;
} finally {
writeLock.unlock();
}
if (!removedPartitions.isEmpty()) {
stateConsumer.removePartitions(removedPartitions);
eventConsumer.removePartitions(withTopic(removedPartitions, eventConsumer.getTopic()));
}
if (!addedPartitions.isEmpty()) {
partitionsInProgress.addAll(addedPartitions);
stateConsumer.addPartitions(addedPartitions, partition -> {
var readLock = partitionsLock.readLock();
readLock.lock();
try {
partitionsInProgress.remove(partition);
log.info("Finished partition {} (still in progress: {})", partition, partitionsInProgress);
if (partitionsInProgress.isEmpty()) {
log.info("All partitions processed");
}
if (this.partitions.contains(partition)) {
eventConsumer.addPartitions(Set.of(partition.withTopic(eventConsumer.getTopic())));
}
} finally {
readLock.unlock();
}
});
}
initialized = true;
}
public Set<TopicPartitionInfo> getPartitionsInProgress() {
return initialized ? partitionsInProgress : null;
}
}

View File

@ -20,6 +20,6 @@ import java.io.Serializable;
public enum QueueTaskType implements Serializable {
UPDATE_PARTITIONS, UPDATE_CONFIG, DELETE,
ADD_PARTITIONS, REMOVE_PARTITIONS
ADD_PARTITIONS, REMOVE_PARTITIONS, DELETE_PARTITIONS
}

View File

@ -60,4 +60,11 @@ public interface TbQueueConsumerManagerTask {
}
}
record DeletePartitionsTask(Set<TopicPartitionInfo> partitions) implements TbQueueConsumerManagerTask {
@Override
public QueueTaskType getType() {
return QueueTaskType.REMOVE_PARTITIONS;
}
}
}

View File

@ -0,0 +1,27 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.common.state;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
public class DefaultQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> extends QueueStateService<E, S> {
public DefaultQueueStateService(PartitionedQueueConsumerManager<E> eventConsumer) {
super(eventConsumer);
}
}

View File

@ -0,0 +1,81 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.common.state;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import java.util.Set;
import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic;
@Slf4j
public class KafkaQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> extends QueueStateService<E, S> {
private final PartitionedQueueConsumerManager<S> stateConsumer;
public KafkaQueueStateService(PartitionedQueueConsumerManager<E> eventConsumer, PartitionedQueueConsumerManager<S> stateConsumer) {
super(eventConsumer);
this.stateConsumer = stateConsumer;
}
@Override
protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
Set<TopicPartitionInfo> statePartitions = withTopic(partitions, stateConsumer.getTopic());
partitionsInProgress.addAll(statePartitions);
stateConsumer.addPartitions(statePartitions, statePartition -> {
var readLock = partitionsLock.readLock();
readLock.lock();
try {
partitionsInProgress.remove(statePartition);
log.info("Finished partition {} (still in progress: {})", statePartition, partitionsInProgress);
if (partitionsInProgress.isEmpty()) {
log.info("All partitions processed");
}
TopicPartitionInfo eventPartition = statePartition.withTopic(eventConsumer.getTopic());
if (this.partitions.get(queueKey).contains(eventPartition)) {
eventConsumer.addPartitions(Set.of(eventPartition));
}
} finally {
readLock.unlock();
}
});
}
@Override
protected void removePartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
super.removePartitions(queueKey, partitions);
stateConsumer.removePartitions(withTopic(partitions, stateConsumer.getTopic()));
}
@Override
protected void deletePartitions(Set<TopicPartitionInfo> partitions) {
super.deletePartitions(partitions);
stateConsumer.delete(withTopic(partitions, stateConsumer.getTopic()));
}
@Override
public void stop() {
super.stop();
stateConsumer.stop();
stateConsumer.awaitStop();
}
}

View File

@ -0,0 +1,114 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.common.state;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic;
@Slf4j
public abstract class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
protected final PartitionedQueueConsumerManager<E> eventConsumer;
@Getter
protected final Map<QueueKey, Set<TopicPartitionInfo>> partitions = new HashMap<>();
protected final Set<TopicPartitionInfo> partitionsInProgress = ConcurrentHashMap.newKeySet();
protected boolean initialized;
protected final ReadWriteLock partitionsLock = new ReentrantReadWriteLock();
protected QueueStateService(PartitionedQueueConsumerManager<E> eventConsumer) {
this.eventConsumer = eventConsumer;
}
public void update(QueueKey queueKey, Set<TopicPartitionInfo> newPartitions) {
newPartitions = withTopic(newPartitions, eventConsumer.getTopic());
var writeLock = partitionsLock.writeLock();
writeLock.lock();
Set<TopicPartitionInfo> oldPartitions = this.partitions.getOrDefault(queueKey, Collections.emptySet());
Set<TopicPartitionInfo> addedPartitions;
Set<TopicPartitionInfo> removedPartitions;
try {
addedPartitions = new HashSet<>(newPartitions);
addedPartitions.removeAll(oldPartitions);
removedPartitions = new HashSet<>(oldPartitions);
removedPartitions.removeAll(newPartitions);
this.partitions.put(queueKey, newPartitions);
} finally {
writeLock.unlock();
}
if (!removedPartitions.isEmpty()) {
removePartitions(queueKey, removedPartitions);
}
if (!addedPartitions.isEmpty()) {
addPartitions(queueKey, addedPartitions);
}
initialized = true;
}
protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
eventConsumer.addPartitions(partitions);
}
protected void removePartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
eventConsumer.removePartitions(partitions);
}
public void delete(Set<TopicPartitionInfo> partitions) {
if (partitions.isEmpty()) {
return;
}
var writeLock = partitionsLock.writeLock();
writeLock.lock();
try {
this.partitions.values().forEach(tpis -> tpis.removeAll(partitions));
} finally {
writeLock.unlock();
}
deletePartitions(partitions);
}
protected void deletePartitions(Set<TopicPartitionInfo> partitions) {
eventConsumer.delete(withTopic(partitions, eventConsumer.getTopic()));
}
public Set<TopicPartitionInfo> getPartitionsInProgress() {
return initialized ? partitionsInProgress : null;
}
public void stop() {
eventConsumer.stop();
eventConsumer.awaitStop();
}
}

View File

@ -52,7 +52,10 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.thingsboard.server.common.data.DataConstants.CF_QUEUE_NAME;
import static org.thingsboard.server.common.data.DataConstants.CF_STATES_QUEUE_NAME;
import static org.thingsboard.server.common.data.DataConstants.EDGE_QUEUE_NAME;
import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME;
@ -159,16 +162,7 @@ public class HashPartitionService implements PartitionService {
List<QueueRoutingInfo> queueRoutingInfoList = getQueueRoutingInfos();
queueRoutingInfoList.forEach(queue -> {
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue);
if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) {
QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME);
partitionSizesMap.put(cfQueueKey, queue.getPartitions());
partitionTopicsMap.put(cfQueueKey, cfEventTopic);
QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME);
partitionSizesMap.put(cfQueueStatesKey, queue.getPartitions());
partitionTopicsMap.put(cfQueueStatesKey, cfStateTopic);
}
partitionTopicsMap.put(queueKey, queue.getQueueTopic());
partitionSizesMap.put(queueKey, queue.getPartitions());
updateQueue(queueKey, queue.getQueueTopic(), queue.getPartitions());
queueConfigs.put(queueKey, new QueueConfig(queue));
});
}
@ -215,16 +209,7 @@ public class HashPartitionService implements PartitionService {
QueueRoutingInfo queueRoutingInfo = new QueueRoutingInfo(queueUpdateMsg);
TenantId tenantId = queueRoutingInfo.getTenantId();
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueRoutingInfo.getQueueName(), tenantId);
if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) {
QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME);
partitionSizesMap.put(cfQueueKey, queueRoutingInfo.getPartitions());
partitionTopicsMap.put(cfQueueKey, cfEventTopic);
QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME);
partitionSizesMap.put(cfQueueStatesKey, queueRoutingInfo.getPartitions());
partitionTopicsMap.put(cfQueueStatesKey, cfStateTopic);
}
partitionTopicsMap.put(queueKey, queueRoutingInfo.getQueueTopic());
partitionSizesMap.put(queueKey, queueRoutingInfo.getPartitions());
updateQueue(queueKey, queueRoutingInfo.getQueueTopic(), queueRoutingInfo.getPartitions());
queueConfigs.put(queueKey, new QueueConfig(queueRoutingInfo));
if (!tenantId.isSysTenantId()) {
tenantRoutingInfoMap.remove(tenantId);
@ -235,9 +220,15 @@ public class HashPartitionService implements PartitionService {
@Override
public void removeQueues(List<TransportProtos.QueueDeleteMsg> queueDeleteMsgs) {
List<QueueKey> queueKeys = queueDeleteMsgs.stream()
.map(queueDeleteMsg -> {
.flatMap(queueDeleteMsg -> {
TenantId tenantId = TenantId.fromUUID(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
return new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
if (queueKey.getQueueName().equals(MAIN_QUEUE_NAME)) {
return Stream.of(queueKey, queueKey.withQueueName(CF_QUEUE_NAME),
queueKey.withQueueName(CF_STATES_QUEUE_NAME));
} else {
return Stream.of(queueKey);
}
}).toList();
queueKeys.forEach(queueKey -> {
removeQueue(queueKey);
@ -252,25 +243,38 @@ public class HashPartitionService implements PartitionService {
@Override
public void removeTenant(TenantId tenantId) {
List<QueueKey> queueKeys = partitionSizesMap.keySet().stream()
.filter(queueKey -> tenantId.equals(queueKey.getTenantId())).toList();
.filter(queueKey -> tenantId.equals(queueKey.getTenantId()))
.flatMap(queueKey -> {
if (queueKey.getQueueName().equals(MAIN_QUEUE_NAME)) {
return Stream.of(queueKey, queueKey.withQueueName(CF_QUEUE_NAME),
queueKey.withQueueName(CF_STATES_QUEUE_NAME));
} else {
return Stream.of(queueKey);
}
})
.toList();
queueKeys.forEach(this::removeQueue);
evictTenantInfo(tenantId);
}
private void updateQueue(QueueKey queueKey, String topic, int partitions) {
partitionTopicsMap.put(queueKey, topic);
partitionSizesMap.put(queueKey, partitions);
if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) {
QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME);
partitionTopicsMap.put(cfQueueKey, cfEventTopic);
partitionSizesMap.put(cfQueueKey, partitions);
QueueKey cfStatesQueueKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME);
partitionTopicsMap.put(cfStatesQueueKey, cfStateTopic);
partitionSizesMap.put(cfStatesQueueKey, partitions);
}
}
private void removeQueue(QueueKey queueKey) {
myPartitions.remove(queueKey);
partitionTopicsMap.remove(queueKey);
partitionSizesMap.remove(queueKey);
queueConfigs.remove(queueKey);
if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) {
QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME);
partitionSizesMap.remove(cfQueueKey);
partitionTopicsMap.remove(cfQueueKey);
QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME);
partitionSizesMap.remove(cfQueueStatesKey);
partitionTopicsMap.remove(cfQueueStatesKey);
}
}
@Override

View File

@ -23,9 +23,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import static org.thingsboard.server.common.data.DataConstants.CF_QUEUE_NAME;
import static org.thingsboard.server.common.data.DataConstants.CF_STATES_QUEUE_NAME;
@Data
@AllArgsConstructor
public class QueueKey {

View File

@ -91,7 +91,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
@Override
public void deleteTopic(String topic) {
Set<String> topics = getTopics();
if (topics.contains(topic)) {
if (topics.remove(topic)) {
settings.getAdminClient().deleteTopics(Collections.singletonList(topic));
} else {
try {