Clear interruption status after exiting consumer while loop

This commit is contained in:
ViacheslavKlimov 2023-10-19 14:00:04 +03:00
parent 5108e98f38
commit 049f40a62e
2 changed files with 10 additions and 5 deletions

View File

@ -41,8 +41,8 @@ public class TbQueueConsumerTask {
private CountDownLatch completionLatch; private CountDownLatch completionLatch;
public void setTask(Future<?> task) { public void setTask(Future<?> task) {
this.task = task;
this.completionLatch = new CountDownLatch(1); this.completionLatch = new CountDownLatch(1);
this.task = task;
} }
public void subscribe(Set<TopicPartitionInfo> partitions) { public void subscribe(Set<TopicPartitionInfo> partitions) {
@ -63,14 +63,14 @@ public class TbQueueConsumerTask {
if (isRunning()) { if (isRunning()) {
try { try {
if (!completionLatch.await(30, TimeUnit.SECONDS)) { if (!completionLatch.await(30, TimeUnit.SECONDS)) {
task = null;
throw new IllegalStateException("timeout of 30 seconds expired"); throw new IllegalStateException("timeout of 30 seconds expired");
} }
log.trace("[{}] Awaited finish", key);
} catch (Exception e) { } catch (Exception e) {
log.warn("[{}] Failed to await for consumer to stop", key, e); log.warn("[{}] Failed to await for consumer to stop", key, e);
} }
task = null;
} }
log.trace("[{}] Awaited finish", key);
} }
public boolean isRunning() { public boolean isRunning() {
@ -79,6 +79,7 @@ public class TbQueueConsumerTask {
public void finished() { public void finished() {
completionLatch.countDown(); completionLatch.countDown();
task = null;
} }
} }

View File

@ -237,7 +237,11 @@ public class TbRuleEngineQueueConsumerManager {
log.info("[{}] Launching consumer", consumerTask.getKey()); log.info("[{}] Launching consumer", consumerTask.getKey());
Future<?> consumerLoop = ctx.getConsumersExecutor().submit(() -> { Future<?> consumerLoop = ctx.getConsumersExecutor().submit(() -> {
ThingsBoardThreadFactory.updateCurrentThreadName(consumerTask.getKey().toString()); ThingsBoardThreadFactory.updateCurrentThreadName(consumerTask.getKey().toString());
consumerLoop(consumerTask.getConsumer()); try {
consumerLoop(consumerTask.getConsumer());
} catch (Throwable e) {
log.error("Failure in consumer loop", e);
}
consumerTask.finished(); consumerTask.finished();
}); });
consumerTask.setTask(consumerLoop); consumerTask.setTask(consumerLoop);
@ -262,7 +266,7 @@ public class TbRuleEngineQueueConsumerManager {
} }
} }
} }
if (consumer.isStopped()) { if (Thread.interrupted() || consumer.isStopped()) {
consumer.unsubscribe(); consumer.unsubscribe();
} }
log.info("Rule Engine consumer stopped"); log.info("Rule Engine consumer stopped");