fixed stoping RE consumers

This commit is contained in:
YevhenBondarenko 2022-05-19 16:14:58 +02:00
parent b8f2d6ee9c
commit 38fb68b200

View File

@ -75,6 +75,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@ -268,7 +269,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
final TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(configuration);
final TbRuleEngineProcessingStrategy ackStrategy = getAckStrategy(configuration);
submitStrategy.init(msgs);
while (!stopped) {
while (!stopped && !consumer.isStopped()) {
TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy, ackStrategy.isSkipTimeoutMsgs());
submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> submitMessage(configuration, stats, ctx, id, msg)));