Refactoring in progress

This commit is contained in:
Andrii Shvaika 2023-09-18 12:49:19 +03:00 committed by ViacheslavKlimov
parent a360995e49
commit 45fd970898
6 changed files with 356 additions and 110 deletions

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -68,6 +68,7 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrateg
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory; import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory;
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
import org.thingsboard.server.service.stats.RuleEngineStatisticsService; import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
import org.threadly.concurrent.wrapper.KeyDistributedExecutor;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
@ -116,12 +117,12 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
private final QueueService queueService; private final QueueService queueService;
private final TbQueueProducerProvider producerProvider; private final TbQueueProducerProvider producerProvider;
private final TbQueueAdmin queueAdmin; private final TbQueueAdmin queueAdmin;
private final ConcurrentMap<QueueKey, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers = new ConcurrentHashMap<>();
private final ConcurrentMap<QueueKey, Queue> consumerConfigurations = new ConcurrentHashMap<>(); private final ConcurrentMap<QueueKey, TbRuleEngineQueueConsumerManager> consumerMap = new ConcurrentHashMap<>();
private final ConcurrentMap<QueueKey, TbRuleEngineConsumerStats> consumerStats = new ConcurrentHashMap<>(); private final ExecutorService consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer"));
private final ConcurrentMap<QueueKey, TbTopicWithConsumerPerPartition> topicsConsumerPerPartition = new ConcurrentHashMap<>(); private final ExecutorService submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-submit"));
final ExecutorService submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-submit")); private final ScheduledExecutorService repartitionExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-repartition"));
final ScheduledExecutorService repartitionExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-repartition"));
public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory, public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory,
TbRuleEngineSubmitStrategyFactory submitStrategyFactory, TbRuleEngineSubmitStrategyFactory submitStrategyFactory,
@ -153,7 +154,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
@PostConstruct @PostConstruct
public void init() { public void init() {
super.init("tb-rule-engine-consumer", "tb-rule-engine-notifications-consumer"); super.init("tb-rule-engine-notifications-consumer"); // TODO: restore init of the main consumer?
List<Queue> queues = queueService.findAllQueues(); List<Queue> queues = queueService.findAllQueues();
for (Queue configuration : queues) { for (Queue configuration : queues) {
if (partitionService.isManagedByCurrentService(configuration.getTenantId())) { if (partitionService.isManagedByCurrentService(configuration.getTenantId())) {
@ -163,19 +164,14 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
} }
private void initConsumer(Queue configuration) { private void initConsumer(Queue configuration) {
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, configuration); consumerMap.computeIfAbsent(new QueueKey(ServiceType.TB_RULE_ENGINE, configuration),
consumerConfigurations.putIfAbsent(queueKey, configuration); key -> new TbRuleEngineQueueConsumerManager(repartitionExecutor, consumersExecutor, statsFactory, tbRuleEngineQueueFactory, key)).init(configuration);
consumerStats.putIfAbsent(queueKey, new TbRuleEngineConsumerStats(configuration, statsFactory));
if (!configuration.isConsumerPerPartition()) {
consumers.computeIfAbsent(queueKey, queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration));
} else {
topicsConsumerPerPartition.computeIfAbsent(queueKey, k -> new TbTopicWithConsumerPerPartition(k.getQueueName()));
}
} }
@PreDestroy @PreDestroy
public void stop() { public void stop() {
super.destroy(); super.destroy();
consumersExecutor.shutdownNow(); // TODO: shutdown or shutdownNow?
submitExecutor.shutdownNow(); submitExecutor.shutdownNow();
repartitionExecutor.shutdownNow(); repartitionExecutor.shutdownNow();
} }
@ -183,119 +179,107 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
@Override @Override
protected void onTbApplicationEvent(PartitionChangeEvent event) { protected void onTbApplicationEvent(PartitionChangeEvent event) {
if (event.getServiceType().equals(getServiceType())) { if (event.getServiceType().equals(getServiceType())) {
event.getPartitionsMap().forEach((queueKey, partitions) -> { var consumer = consumerMap.get(event.getQueueKey());
String serviceQueue = queueKey.getQueueName(); if (consumer != null) {
log.info("[{}] Subscribing to partitions: {}", serviceQueue, partitions); consumer.subscribe(event);
Queue configuration = consumerConfigurations.get(queueKey); } else {
if (configuration == null) { log.warn("Received invalid partition change event for {} that is not managed by this service", event.getQueueKey());
return;
}
if (!configuration.isConsumerPerPartition()) {
consumers.get(queueKey).subscribe(partitions);
} else {
log.info("[{}] Subscribing consumer per partition: {}", serviceQueue, partitions);
subscribeConsumerPerPartition(queueKey, partitions);
}
});
}
}
void subscribeConsumerPerPartition(QueueKey queue, Set<TopicPartitionInfo> partitions) {
topicsConsumerPerPartition.get(queue).getSubscribeQueue().add(partitions);
scheduleTopicRepartition(queue);
}
private void scheduleTopicRepartition(QueueKey queue) {
repartitionExecutor.schedule(() -> repartitionTopicWithConsumerPerPartition(queue), 1, TimeUnit.SECONDS);
}
void repartitionTopicWithConsumerPerPartition(final QueueKey queueKey) {
if (stopped) {
return;
}
TbTopicWithConsumerPerPartition tbTopicWithConsumerPerPartition = topicsConsumerPerPartition.get(queueKey);
java.util.Queue<Set<TopicPartitionInfo>> subscribeQueue = tbTopicWithConsumerPerPartition.getSubscribeQueue();
if (subscribeQueue.isEmpty()) {
return;
}
if (tbTopicWithConsumerPerPartition.getLock().tryLock()) {
try {
Set<TopicPartitionInfo> partitions = null;
while (!subscribeQueue.isEmpty()) {
partitions = subscribeQueue.poll();
}
if (partitions == null) {
return;
}
Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions);
ConcurrentMap<TopicPartitionInfo, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers = tbTopicWithConsumerPerPartition.getConsumers();
addedPartitions.removeAll(consumers.keySet());
log.info("calculated addedPartitions {}", addedPartitions);
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(consumers.keySet());
removedPartitions.removeAll(partitions);
log.info("calculated removedPartitions {}", removedPartitions);
removedPartitions.forEach((tpi) -> {
removeConsumerForTopicByTpi(queueKey.getQueueName(), consumers, tpi);
});
addedPartitions.forEach((tpi) -> {
log.info("[{}] Adding consumer for topic: {}", queueKey, tpi);
Queue configuration = consumerConfigurations.get(queueKey);
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer = tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration);
consumers.put(tpi, consumer);
launchConsumer(consumer, consumerConfigurations.get(queueKey), consumerStats.get(queueKey), "" + queueKey + "-" + tpi.getPartition().orElse(-999999));
consumer.subscribe(Collections.singleton(tpi));
});
} finally {
tbTopicWithConsumerPerPartition.getLock().unlock();
} }
} else {
scheduleTopicRepartition(queueKey); //reschedule later
} }
} }
// void subscribeConsumerPerPartition(QueueKey queue, Set<TopicPartitionInfo> partitions) {
// topicsConsumerPerPartition.get(queue).getSubscribeQueue().add(partitions);
// scheduleTopicRepartition(queue);
// }
//
// private void scheduleTopicRepartition(QueueKey queue) {
// repartitionExecutor.schedule(() -> repartitionTopicWithConsumerPerPartition(queue), 1, TimeUnit.SECONDS);
// }
//
// void repartitionTopicWithConsumerPerPartition(final QueueKey queueKey) {
// if (stopped) {
// return;
// }
// TbTopicWithConsumerPerPartition tbTopicWithConsumerPerPartition = topicsConsumerPerPartition.get(queueKey);
// java.util.Queue<Set<TopicPartitionInfo>> subscribeQueue = tbTopicWithConsumerPerPartition.getSubscribeQueue();
// if (subscribeQueue.isEmpty()) {
// return;
// }
// if (tbTopicWithConsumerPerPartition.getLock().tryLock()) {
// try {
// Set<TopicPartitionInfo> partitions = null;
// while (!subscribeQueue.isEmpty()) {
// partitions = subscribeQueue.poll();
// }
// if (partitions == null) {
// return;
// }
//
// Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions);
// ConcurrentMap<TopicPartitionInfo, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers = tbTopicWithConsumerPerPartition.getConsumers();
// addedPartitions.removeAll(consumers.keySet());
// log.info("calculated addedPartitions {}", addedPartitions);
//
// Set<TopicPartitionInfo> removedPartitions = new HashSet<>(consumers.keySet());
// removedPartitions.removeAll(partitions);
// log.info("calculated removedPartitions {}", removedPartitions);
//
// removedPartitions.forEach((tpi) -> {
// removeConsumerForTopicByTpi(queueKey.getQueueName(), consumers, tpi);
// });
//
// addedPartitions.forEach((tpi) -> {
// log.info("[{}] Adding consumer for topic: {}", queueKey, tpi);
// Queue configuration = consumerConfigurations.get(queueKey);
// TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer = tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration);
// consumers.put(tpi, consumer);
// launchConsumer(consumer, queueKey, tpi.getFullTopicName(), queueKey + "-" + tpi.getPartition().orElse(-999999));
// consumer.subscribe(Collections.singleton(tpi));
// });
// } finally {
// tbTopicWithConsumerPerPartition.getLock().unlock();
// }
// } else {
// scheduleTopicRepartition(queueKey); //reschedule later
// }
// }
void removeConsumerForTopicByTpi(String queue, ConcurrentMap<TopicPartitionInfo, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers, TopicPartitionInfo tpi) { void removeConsumerForTopicByTpi(String queue, ConcurrentMap<TopicPartitionInfo, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers, TopicPartitionInfo tpi) {
log.info("[{}] Removing consumer for topic: {}", queue, tpi); log.info("[{}] Removing consumer for topic: {}", queue, tpi);
consumers.get(tpi).unsubscribe(); consumers.remove(tpi).stop();
consumers.remove(tpi);
} }
@Override @Override
protected void launchMainConsumers() { protected void launchMainConsumers() {
consumers.forEach((queue, consumer) -> launchConsumer(consumer, consumerConfigurations.get(queue), consumerStats.get(queue), queue.getQueueName())); consumers.forEach((queue, consumer) -> launchConsumer(consumer, queue, queue, queue.getQueueName()));
} }
@Override @Override
protected void stopMainConsumers() { protected void stopMainConsumers() {
consumers.values().forEach(TbQueueConsumer::unsubscribe); consumerMap.values().forEach(TbRuleEngineQueueConsumerManager::stop);
topicsConsumerPerPartition.values().forEach(tbTopicWithConsumerPerPartition -> tbTopicWithConsumerPerPartition.getConsumers().keySet()
.forEach((tpi) -> removeConsumerForTopicByTpi(tbTopicWithConsumerPerPartition.getTopic(), tbTopicWithConsumerPerPartition.getConsumers(), tpi)));
} }
void launchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { void launchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, QueueKey queueKey, Object consumerKey, String threadSuffix) {
if (isReady) { if (isReady) {
consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix)); log.info("[{}] Launching consumer", consumerKey);
consumersExecutor.execute(consumerKey, () -> consumerLoop(consumer, queueKey, threadSuffix));
} else { } else {
scheduleLaunchConsumer(consumer, configuration, stats, threadSuffix); scheduleLaunchConsumer(consumer, queueKey, consumerKey, threadSuffix);
} }
} }
private void scheduleLaunchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { private void scheduleLaunchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, QueueKey queueKey, Object consumerKey, String threadSuffix) {
repartitionExecutor.schedule(() -> { repartitionExecutor.schedule(() -> {
if (isReady) { launchConsumer(consumer, queueKey, consumerKey, threadSuffix);
consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix));
} else {
scheduleLaunchConsumer(consumer, configuration, stats, threadSuffix);
}
}, 10, TimeUnit.SECONDS); }, 10, TimeUnit.SECONDS);
} }
void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, org.thingsboard.server.common.data.queue.Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, QueueKey queueKey, String threadSuffix) {
Queue configuration = consumerConfigurations.get(queueKey);
TbRuleEngineConsumerStats stats = consumerStats.get(queueKey);
updateCurrentThreadName(threadSuffix); updateCurrentThreadName(threadSuffix);
while (!stopped && !consumer.isStopped() && !consumer.isQueueDeleted()) { while (!stopped && !consumer.isStopped() && !consumer.isQueueDeleted()) {
try { try {
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(configuration.getPollInterval()); List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(configuration.getPollInterval());
@ -347,7 +331,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
} }
} }
if (consumer.isQueueDeleted()) { if (consumer.isStopped()) {
consumer.unsubscribe();
} else if (consumer.isQueueDeleted()) {
processQueueDeletion(configuration, consumer); processQueueDeletion(configuration, consumer);
} }
log.info("TB Rule Engine Consumer stopped."); log.info("TB Rule Engine Consumer stopped.");
@ -460,20 +446,20 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
ReentrantLock lock = consumerPerPartition.getLock(); ReentrantLock lock = consumerPerPartition.getLock();
try { try {
lock.lock(); lock.lock();
consumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::unsubscribe); consumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::stop);
} finally { } finally {
lock.unlock(); lock.unlock();
} }
} else { } else {
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer = consumers.remove(queueKey); TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer = consumers.remove(queueKey);
consumer.unsubscribe(); consumer.stop();
} }
} }
initConsumer(queue); initConsumer(queue);
if (!queue.isConsumerPerPartition()) { if (!queue.isConsumerPerPartition()) {
launchConsumer(consumers.get(queueKey), consumerConfigurations.get(queueKey), consumerStats.get(queueKey), queueName); launchConsumer(consumers.get(queueKey), queueKey, queueKey, queueName);
} }
} }
@ -502,6 +488,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
} }
} }
} }
partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
} }
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) { private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {

View File

@ -0,0 +1,37 @@
package org.thingsboard.server.service.queue;
import lombok.Data;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.QueueKey;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Data
public class TbQueueConsumerLauncher {
private final TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> consumer;
private volatile Future<?> task;
public void stop() {
this.consumer.stop();
}
public void awaitStopped() throws ExecutionException, InterruptedException, TimeoutException {
if (task != null) {
this.task.get(3, TimeUnit.MINUTES);
}
}
public void subscribe(Set<TopicPartitionInfo> partitions) {
this.consumer.subscribe(partitions);
}
}

View File

@ -0,0 +1,17 @@
package org.thingsboard.server.service.queue;
import lombok.Data;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import java.util.Set;
@Data
public class TbQueueConsumerManagerTask {
private final ComponentLifecycleEvent event;
private final Queue queue;
private final Set<TopicPartitionInfo> partitions;
}

View File

@ -25,6 +25,7 @@ import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
import java.util.ArrayList; import java.util.ArrayList;
@ -66,8 +67,8 @@ public class TbRuleEngineConsumerStats {
private final String queueName; private final String queueName;
private final TenantId tenantId; private final TenantId tenantId;
public TbRuleEngineConsumerStats(Queue queue, StatsFactory statsFactory) { public TbRuleEngineConsumerStats(QueueKey queue, StatsFactory statsFactory) {
this.queueName = queue.getName(); this.queueName = queue.getQueueName();
this.tenantId = queue.getTenantId(); this.tenantId = queue.getTenantId();
this.statsFactory = statsFactory; this.statsFactory = statsFactory;

View File

@ -0,0 +1,205 @@
package org.thingsboard.server.service.queue;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@Data
@Slf4j
public class TbRuleEngineQueueConsumerManager {
private final ScheduledExecutorService scheduler;
private final ExecutorService consumerExecutor;
private final StatsFactory statsFactory;
private final TbRuleEngineQueueFactory queueFactory;
private final QueueKey key;
private final ReentrantLock lock = new ReentrantLock(); //NonfairSync
private final ConcurrentMap<TopicPartitionInfo, TbQueueConsumerLauncher> consumers = new ConcurrentHashMap<>();
private final TbRuleEngineConsumerStats stats;
private volatile Set<TopicPartitionInfo> partitions = Collections.emptySet();
private volatile Queue queue;
private volatile TbQueueConsumerLauncher mainConsumer;
private final java.util.Queue<TbQueueConsumerManagerTask> tasks = new ConcurrentLinkedQueue<>();
public TbRuleEngineQueueConsumerManager(ScheduledExecutorService scheduler, ExecutorService consumerExecutor, StatsFactory statsFactory, TbRuleEngineQueueFactory queueFactory, QueueKey key) {
this.scheduler = scheduler;
this.consumerExecutor = consumerExecutor;
this.statsFactory = statsFactory;
this.queueFactory = queueFactory;
this.key = key;
this.stats = new TbRuleEngineConsumerStats(key, statsFactory);
}
public void init(Queue queue) {
processTask(new TbQueueConsumerManagerTask(ComponentLifecycleEvent.CREATED, queue, null));
}
private void processTask(TbQueueConsumerManagerTask todo) {
tasks.add(todo);
log.info("[{}] Adding task: {}", key, todo);
tryProcessTasks();
}
private void tryProcessTasks() {
consumerExecutor.submit(() -> {
if (lock.tryLock()) {
try {
TbQueueConsumerManagerTask lastUpdateTask = null;
while (!tasks.isEmpty()) {
TbQueueConsumerManagerTask task = tasks.poll();
switch (task.getEvent()) {
case CREATED:
doInit(task.getQueue());
break;
case UPDATED:
lastUpdateTask = task;
break;
case DELETED:
lastUpdateTask = null;
doDelete();
break;
}
}
if (lastUpdateTask != null) {
doUpdate(lastUpdateTask.getQueue(), lastUpdateTask.getPartitions());
}
} finally {
lock.unlock();
}
} else {
log.debug("[{}] Failed to acquire lock.", key);
scheduler.schedule(this::tryProcessTasks, 1, TimeUnit.SECONDS);
}
});
}
public void doInit(Queue queue) {
log.info("[{}] Init consumer with queue: {}", key, queue);
this.queue = queue;
if (!queue.isConsumerPerPartition()) {
mainConsumer = new TbQueueConsumerLauncher(queueFactory.createToRuleEngineMsgConsumer(queue));
}
}
private void doUpdate(Queue newQueue, Set<TopicPartitionInfo> partitions) {
if (newQueue.isConsumerPerPartition()) {
} else {
for (var oldConsumer : consumers.values()) {
oldConsumer.stop();
}
for (var oldConsumer : consumers.entrySet()) {
try {
oldConsumer.getValue().awaitStopped();
} catch (Exception e) {
log.info("[{}][{}] Failed to stop the consumer during update", key, oldConsumer.getKey().getPartition().orElse(-1), e);
}
}
if (mainConsumer == null) {
mainConsumer = new TbQueueConsumerLauncher(queueFactory.createToRuleEngineMsgConsumer(queue));
//TODO: launch
}
mainConsumer.subscribe(partitions);
}
}
private void doDelete() {
}
public void subscribe(PartitionChangeEvent event) {
log.info("[{}] Subscribing to partitions: {}", key, event.getPartitions());
if (!queue.isConsumerPerPartition()) {
mainConsumer.subscribe(event.getPartitions());
} else {
log.info("[{}] Subscribing consumer per partition: {}", key, event.getPartitions());
subscribeConsumerPerPartition(event.getQueueKey(), event.getPartitions());
}
}
void subscribeConsumerPerPartition(QueueKey queue, Set<TopicPartitionInfo> partitions) {
topicsConsumerPerPartition.get(queue).getSubscribeQueue().add(partitions);
scheduleTopicRepartition(queue);
}
private void scheduleTopicRepartition(QueueKey queue) {
repartitionExecutor.schedule(() -> repartitionTopicWithConsumerPerPartition(queue), 1, TimeUnit.SECONDS);
}
void repartitionTopicWithConsumerPerPartition(final QueueKey queueKey) {
if (stopped) {
return;
}
TbTopicWithConsumerPerPartition tbTopicWithConsumerPerPartition = topicsConsumerPerPartition.get(queueKey);
java.util.Queue<Set<TopicPartitionInfo>> subscribeQueue = tbTopicWithConsumerPerPartition.getSubscribeQueue();
if (subscribeQueue.isEmpty()) {
return;
}
if (tbTopicWithConsumerPerPartition.getLock().tryLock()) {
try {
Set<TopicPartitionInfo> partitions = null;
while (!subscribeQueue.isEmpty()) {
partitions = subscribeQueue.poll();
}
if (partitions == null) {
return;
}
Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions);
ConcurrentMap<TopicPartitionInfo, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>>> consumers = tbTopicWithConsumerPerPartition.getConsumers();
addedPartitions.removeAll(consumers.keySet());
log.info("calculated addedPartitions {}", addedPartitions);
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(consumers.keySet());
removedPartitions.removeAll(partitions);
log.info("calculated removedPartitions {}", removedPartitions);
removedPartitions.forEach((tpi) -> {
removeConsumerForTopicByTpi(queueKey.getQueueName(), consumers, tpi);
});
addedPartitions.forEach((tpi) -> {
log.info("[{}] Adding consumer for topic: {}", queueKey, tpi);
Queue configuration = consumerConfigurations.get(queueKey);
TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> consumer = tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration);
consumers.put(tpi, consumer);
launchConsumer(consumer, queueKey, tpi.getFullTopicName(), queueKey + "-" + tpi.getPartition().orElse(-999999));
consumer.subscribe(Collections.singleton(tpi));
});
} finally {
tbTopicWithConsumerPerPartition.getLock().unlock();
}
} else {
scheduleTopicRepartition(queueKey); //reschedule later
}
}
public void stop() {
// consumers.values().forEach(TbQueueConsumer::stop);
// topicsConsumerPerPartition.values().forEach(tbTopicWithConsumerPerPartition -> tbTopicWithConsumerPerPartition.getConsumers().keySet()
// .forEach((tpi) -> removeConsumerForTopicByTpi(tbTopicWithConsumerPerPartition.getTopic(), tbTopicWithConsumerPerPartition.getConsumers(), tpi)));
}
}

View File

@ -82,7 +82,6 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory
* @return * @return
* @param configuration * @param configuration
*/ */
//TODO 2.5 ybondarenko: make sure you use queueName to distinct consumers where necessary
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration); TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration);
/** /**