diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index fd5a252cc5..b2d7700441 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -142,8 +142,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> usageStatsConsumer; private final TbQueueConsumer> firmwareStatesConsumer; + protected volatile ExecutorService consumersExecutor; protected volatile ExecutorService usageStatsExecutor; - private volatile ExecutorService firmwareStatesExecutor; public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, @@ -186,7 +186,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService implements TbRuleEngineConsumerService { private final TbRuleEngineConsumerContext ctx; + private final QueueService queueService; private final TbRuleEngineDeviceRpcService tbDeviceRpcService; private final ConcurrentMap consumers = new ConcurrentHashMap<>(); @@ -68,6 +72,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< ActorSystemContext actorContext, DataDecodingEncodingService encodingService, TbRuleEngineDeviceRpcService tbDeviceRpcService, + QueueService queueService, TbDeviceProfileCache deviceProfileCache, TbAssetProfileCache assetProfileCache, TbTenantProfileCache tenantProfileCache, @@ -77,12 +82,13 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< eventPublisher, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer(), Optional.empty()); this.ctx = ctx; this.tbDeviceRpcService = tbDeviceRpcService; + this.queueService = queueService; } @PostConstruct public void init() { - super.init("tb-rule-engine-notifications-consumer"); // TODO: restore init of the main consumer? - List queues = ctx.findAllQueues(); + super.init("tb-rule-engine-notifications-consumer"); + List queues = queueService.findAllQueues(); for (Queue configuration : queues) { if (partitionService.isManagedByCurrentService(configuration.getTenantId())) { initConsumer(configuration); @@ -91,30 +97,38 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } private void initConsumer(Queue configuration) { - consumers.computeIfAbsent(new QueueKey(ServiceType.TB_RULE_ENGINE, configuration), - key -> new TbRuleEngineQueueConsumerManager(ctx, key)).init(configuration); + getOrCreateConsumer(new QueueKey(ServiceType.TB_RULE_ENGINE, configuration)).init(configuration); } @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { if (event.getServiceType().equals(getServiceType())) { - var consumer = consumers.get(event.getQueueKey()); - if (consumer != null) { - consumer.subscribe(event); - } else { - log.warn("Received invalid partition change event for {} that is not managed by this service", event.getQueueKey()); - } + event.getPartitionsMap().forEach((queueKey, partitions) -> { + var consumer = consumers.get(queueKey); + if (consumer != null) { + consumer.update(partitions); + } else { + log.warn("Received invalid partition change event for {} that is not managed by this service", queueKey); + } + }); } } + @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) + public void onApplicationEvent(ApplicationReadyEvent event) { + super.onApplicationEvent(event); + ctx.setReady(true); + } + @Override protected void launchMainConsumers() { consumers.values().forEach(TbRuleEngineQueueConsumerManager::launchMainConsumer); } @Override - protected void stopMainConsumers() { + protected void stopConsumers() { consumers.values().forEach(TbRuleEngineQueueConsumerManager::stop); + ctx.stop(); } @Override @@ -164,8 +178,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB())); String queueName = queueUpdateMsg.getQueueName(); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId); - Queue queue = ctx.getQueueService().findQueueById(tenantId, queueId); - consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key)).update(queue); + Queue queue = queueService.findQueueById(tenantId, queueId); + + TbRuleEngineQueueConsumerManager consumerManager = getOrCreateConsumer(queueKey); + Queue oldQueue = consumerManager.getQueue(); + consumerManager.update(queue); + + if (oldQueue != null && queue.getPartitions() == oldQueue.getPartitions()) { + return; + } } partitionService.updateQueue(queueUpdateMsg); @@ -177,15 +198,19 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< log.info("Received queue delete msg: [{}]", queueDeleteMsg); TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); + var consumerManager = consumers.remove(queueKey); + if (consumerManager != null) { + consumerManager.delete(); + } partitionService.removeQueue(queueDeleteMsg); - var manager = consumers.remove(queueKey); - if (manager != null) { - manager.delete(); - } partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE))); } + private TbRuleEngineQueueConsumerManager getOrCreateConsumer(QueueKey queueKey) { + return consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key)); + } + @Scheduled(fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}") public void printStats() { if (ctx.isStatsEnabled()) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java index 45ccbcf32a..842b627c22 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java @@ -67,9 +67,9 @@ public class TbRuleEngineConsumerStats { private final String queueName; private final TenantId tenantId; - public TbRuleEngineConsumerStats(QueueKey queue, StatsFactory statsFactory) { - this.queueName = queue.getQueueName(); - this.tenantId = queue.getTenantId(); + public TbRuleEngineConsumerStats(QueueKey queueKey, StatsFactory statsFactory) { + this.queueName = queueKey.getQueueName(); + this.tenantId = queueKey.getTenantId(); this.statsFactory = statsFactory; String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index b59086a350..d5be1243b7 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -65,7 +65,6 @@ import java.util.stream.Collectors; @Slf4j public abstract class AbstractConsumerService extends TbApplicationEventListener { - protected volatile ExecutorService consumersExecutor; protected volatile ExecutorService notificationsConsumerExecutor; protected volatile boolean stopped = false; protected volatile boolean isReady = false; @@ -99,8 +98,7 @@ public abstract class AbstractConsumerService findAllQueues() { - return queueService.findAllQueues(); + private volatile boolean isReady = false; + + @PostConstruct + public void init() { + this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer")); + this.mgmtExecutor = Executors.newFixedThreadPool(mgmtThreadPoolSize, ThingsBoardThreadFactory.forName("tb-rule-engine-mgmt")); + this.scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-scheduler")); } - @PreDestroy public void stop() { - mgmtExecutor.shutdownNow(); - consumersExecutor.shutdownNow(); // TODO: shutdown or shutdownNow? - submitExecutor.shutdownNow(); scheduler.shutdownNow(); + consumersExecutor.shutdown(); + mgmtExecutor.shutdown(); + try { + mgmtExecutor.awaitTermination(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.warn("Failed to await mgmtExecutor termination"); + } } } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 9fbfd2fc63..6f51413ccd 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1303,7 +1303,9 @@ queue: pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}" # Time in seconds to wait in consumer thread before retries; max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}" # Max allowed time in seconds for pause between retries. # After a queue is deleted (or profile's isolation option was disabled), Rule Engine will continue reading related topics during this period, before deleting the actual topics - topic-deletion-delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SEC:30}" + topic-deletion-delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SEC:15}" + # Size of the thread pool that handles management operations such as subscribe, unsubscribe, queue delete, etc. + management-thread-pool-size: "${TB_QUEUE_RULE_ENGINE_MGMT_THREAD_POOL_SIZE:12}" transport: # For high priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java index 845ac49f15..8418ba67d9 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java @@ -37,12 +37,14 @@ import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.queue.SubmitStrategy; import org.thingsboard.server.common.data.queue.SubmitStrategyType; import org.thingsboard.server.common.msg.queue.RuleEngineException; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.dao.timeseries.TimeseriesDao; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult; import org.thingsboard.server.service.stats.DefaultRuleEngineStatisticsService; @@ -163,7 +165,7 @@ public class BaseQueueControllerTest extends AbstractControllerTest { tenantId, ruleEngineException ))); - TbRuleEngineConsumerStats testStats = new TbRuleEngineConsumerStats(queue, statsFactory); + TbRuleEngineConsumerStats testStats = new TbRuleEngineConsumerStats(new QueueKey(ServiceType.TB_RULE_ENGINE, queue), statsFactory); testStats.log(testProcessingResult, true); int queueStatsTtlDays = 14; @@ -215,7 +217,7 @@ public class BaseQueueControllerTest extends AbstractControllerTest { tenantId, ruleEngineException ))); - TbRuleEngineConsumerStats testStats = new TbRuleEngineConsumerStats(queue, statsFactory); + TbRuleEngineConsumerStats testStats = new TbRuleEngineConsumerStats(new QueueKey(ServiceType.TB_RULE_ENGINE, queue), statsFactory); testStats.log(testProcessingResult, true); ruleEngineStatisticsService.reportQueueStats(System.currentTimeMillis(), testStats);