diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java index 2e3a4676e8..9c0df63ad5 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java @@ -41,8 +41,8 @@ public class TbQueueConsumerTask { private CountDownLatch completionLatch; public void setTask(Future task) { - this.task = task; this.completionLatch = new CountDownLatch(1); + this.task = task; } public void subscribe(Set partitions) { @@ -63,14 +63,14 @@ public class TbQueueConsumerTask { if (isRunning()) { try { if (!completionLatch.await(30, TimeUnit.SECONDS)) { + task = null; throw new IllegalStateException("timeout of 30 seconds expired"); } + log.trace("[{}] Awaited finish", key); } catch (Exception e) { log.warn("[{}] Failed to await for consumer to stop", key, e); } - task = null; } - log.trace("[{}] Awaited finish", key); } public boolean isRunning() { @@ -79,6 +79,7 @@ public class TbQueueConsumerTask { public void finished() { completionLatch.countDown(); + task = null; } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 3a7b2f15d8..71a7c45d45 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -237,7 +237,11 @@ public class TbRuleEngineQueueConsumerManager { log.info("[{}] Launching consumer", consumerTask.getKey()); Future consumerLoop = ctx.getConsumersExecutor().submit(() -> { ThingsBoardThreadFactory.updateCurrentThreadName(consumerTask.getKey().toString()); - consumerLoop(consumerTask.getConsumer()); + try { + consumerLoop(consumerTask.getConsumer()); + } catch (Throwable e) { + log.error("Failure in consumer loop", e); + } consumerTask.finished(); }); consumerTask.setTask(consumerLoop); @@ -262,7 +266,7 @@ public class TbRuleEngineQueueConsumerManager { } } } - if (consumer.isStopped()) { + if (Thread.interrupted() || consumer.isStopped()) { consumer.unsubscribe(); } log.info("Rule Engine consumer stopped");