From 38fb68b2001935b90908a19dd9be39bf43f515a5 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 19 May 2022 16:14:58 +0200 Subject: [PATCH] fixed stoping RE consumers --- .../service/queue/DefaultTbRuleEngineConsumerService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 1844929d35..8496fa7827 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -75,6 +75,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -268,7 +269,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< final TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(configuration); final TbRuleEngineProcessingStrategy ackStrategy = getAckStrategy(configuration); submitStrategy.init(msgs); - while (!stopped) { + while (!stopped && !consumer.isStopped()) { TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy, ackStrategy.isSkipTimeoutMsgs()); submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> submitMessage(configuration, stats, ctx, id, msg)));