diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java index 9c0df63ad5..59d55285f0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.queue.ruleengine; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; @@ -24,7 +25,6 @@ import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -37,13 +37,8 @@ public class TbQueueConsumerTask { @Getter private final TbQueueConsumer> consumer; + @Setter private Future task; - private CountDownLatch completionLatch; - - public void setTask(Future task) { - this.completionLatch = new CountDownLatch(1); - this.task = task; - } public void subscribe(Set partitions) { log.trace("[{}] Subscribing to partitions: {}", key, partitions); @@ -53,23 +48,18 @@ public class TbQueueConsumerTask { public void initiateStop() { log.debug("[{}] Initiating stop", key); consumer.stop(); - if (isRunning()) { - task.cancel(true); - } } public void awaitCompletion() { log.trace("[{}] Awaiting finish", key); if (isRunning()) { try { - if (!completionLatch.await(30, TimeUnit.SECONDS)) { - task = null; - throw new IllegalStateException("timeout of 30 seconds expired"); - } + task.get(30, TimeUnit.SECONDS); log.trace("[{}] Awaited finish", key); } catch (Exception e) { log.warn("[{}] Failed to await for consumer to stop", key, e); } + task = null; } } @@ -77,9 +67,4 @@ public class TbQueueConsumerTask { return task != null; } - public void finished() { - completionLatch.countDown(); - task = null; - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 71a7c45d45..5a59cb124e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -242,7 +242,6 @@ public class TbRuleEngineQueueConsumerManager { } catch (Throwable e) { log.error("Failure in consumer loop", e); } - consumerTask.finished(); }); consumerTask.setTask(consumerLoop); } @@ -266,7 +265,7 @@ public class TbRuleEngineQueueConsumerManager { } } } - if (Thread.interrupted() || consumer.isStopped()) { + if (consumer.isStopped()) { consumer.unsubscribe(); } log.info("Rule Engine consumer stopped"); @@ -282,7 +281,7 @@ public class TbRuleEngineQueueConsumerManager { TbMsgPackProcessingContext packCtx = new TbMsgPackProcessingContext(queue.getName(), submitStrategy, ackStrategy.isSkipTimeoutMsgs()); submitStrategy.submitAttempt((id, msg) -> submitMessage(packCtx, id, msg)); - final boolean timeout = !awaitPackProcessing(packCtx, queue.getPackProcessingTimeout(), true); + final boolean timeout = !packCtx.await(queue.getPackProcessingTimeout(), TimeUnit.MILLISECONDS); TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(queue.getName(), timeout, packCtx); if (timeout) { @@ -310,19 +309,6 @@ public class TbRuleEngineQueueConsumerManager { } } - private boolean awaitPackProcessing(TbMsgPackProcessingContext packCtx, long processingTimeout, boolean ignoreInterrupt) throws InterruptedException { - try { - return packCtx.await(processingTimeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - if (ignoreInterrupt) { - log.debug("Interrupt happened while waiting for pack processing, trying to await one more time"); - return awaitPackProcessing(packCtx, processingTimeout, false); - } else { - throw new RuntimeException("Failed to await pack processing due to thread interrupt", e); - } - } - } - private TbRuleEngineSubmitStrategy getSubmitStrategy(Queue queue) { return ctx.getSubmitStrategyFactory().newInstance(queue.getName(), queue.getSubmitStrategy()); }