diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java index df08c5a5c7..e7b0113dbe 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java @@ -193,7 +193,9 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor { if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) { - getOrCreateConsumer(queueKey).update(partitions); + var consumer = getConsumer(queueKey).orElseGet(() -> { + Queue config = queueService.findQueueByTenantIdAndName(queueKey.getTenantId(), queueKey.getQueueName()); + return createConsumer(queueKey, config); + }); + consumer.update(partitions); } }); consumers.keySet().stream() @@ -190,9 +194,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId); Queue queue = queueService.findQueueById(tenantId, queueId); - TbRuleEngineQueueConsumerManager consumerManager = getOrCreateConsumer(queueKey); - Queue oldQueue = consumerManager.getQueue(); - consumerManager.update(queue); + var consumer = getConsumer(queueKey).orElseGet(() -> createConsumer(queueKey, queue)); + Queue oldQueue = consumer.getQueue(); + consumer.update(queue); if (oldQueue == null || queue.getPartitions() != oldQueue.getPartitions()) { partitionsChanged = true; @@ -235,8 +239,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } } - private TbRuleEngineQueueConsumerManager getOrCreateConsumer(QueueKey queueKey) { - return consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key)); + private Optional getConsumer(QueueKey queueKey) { + return Optional.ofNullable(consumers.get(queueKey)); + } + + private TbRuleEngineQueueConsumerManager createConsumer(QueueKey queueKey, Queue queue) { + var consumer = new TbRuleEngineQueueConsumerManager(ctx, queueKey); + consumers.put(queueKey, consumer); + consumer.init(queue); + return consumer; } private Optional removeConsumer(QueueKey queueKey) {