Don't interrupt consumer loop task on stop

This commit is contained in:
ViacheslavKlimov 2023-10-19 15:29:46 +03:00
parent 049f40a62e
commit 2d1640a002
2 changed files with 6 additions and 35 deletions

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.queue.ruleengine;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
@ -24,7 +25,6 @@ import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -37,13 +37,8 @@ public class TbQueueConsumerTask {
@Getter @Getter
private final TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> consumer; private final TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> consumer;
@Setter
private Future<?> task; private Future<?> task;
private CountDownLatch completionLatch;
public void setTask(Future<?> task) {
this.completionLatch = new CountDownLatch(1);
this.task = task;
}
public void subscribe(Set<TopicPartitionInfo> partitions) { public void subscribe(Set<TopicPartitionInfo> partitions) {
log.trace("[{}] Subscribing to partitions: {}", key, partitions); log.trace("[{}] Subscribing to partitions: {}", key, partitions);
@ -53,23 +48,18 @@ public class TbQueueConsumerTask {
public void initiateStop() { public void initiateStop() {
log.debug("[{}] Initiating stop", key); log.debug("[{}] Initiating stop", key);
consumer.stop(); consumer.stop();
if (isRunning()) {
task.cancel(true);
}
} }
public void awaitCompletion() { public void awaitCompletion() {
log.trace("[{}] Awaiting finish", key); log.trace("[{}] Awaiting finish", key);
if (isRunning()) { if (isRunning()) {
try { try {
if (!completionLatch.await(30, TimeUnit.SECONDS)) { task.get(30, TimeUnit.SECONDS);
task = null;
throw new IllegalStateException("timeout of 30 seconds expired");
}
log.trace("[{}] Awaited finish", key); log.trace("[{}] Awaited finish", key);
} catch (Exception e) { } catch (Exception e) {
log.warn("[{}] Failed to await for consumer to stop", key, e); log.warn("[{}] Failed to await for consumer to stop", key, e);
} }
task = null;
} }
} }
@ -77,9 +67,4 @@ public class TbQueueConsumerTask {
return task != null; return task != null;
} }
public void finished() {
completionLatch.countDown();
task = null;
}
} }

View File

@ -242,7 +242,6 @@ public class TbRuleEngineQueueConsumerManager {
} catch (Throwable e) { } catch (Throwable e) {
log.error("Failure in consumer loop", e); log.error("Failure in consumer loop", e);
} }
consumerTask.finished();
}); });
consumerTask.setTask(consumerLoop); consumerTask.setTask(consumerLoop);
} }
@ -266,7 +265,7 @@ public class TbRuleEngineQueueConsumerManager {
} }
} }
} }
if (Thread.interrupted() || consumer.isStopped()) { if (consumer.isStopped()) {
consumer.unsubscribe(); consumer.unsubscribe();
} }
log.info("Rule Engine consumer stopped"); log.info("Rule Engine consumer stopped");
@ -282,7 +281,7 @@ public class TbRuleEngineQueueConsumerManager {
TbMsgPackProcessingContext packCtx = new TbMsgPackProcessingContext(queue.getName(), submitStrategy, ackStrategy.isSkipTimeoutMsgs()); TbMsgPackProcessingContext packCtx = new TbMsgPackProcessingContext(queue.getName(), submitStrategy, ackStrategy.isSkipTimeoutMsgs());
submitStrategy.submitAttempt((id, msg) -> submitMessage(packCtx, id, msg)); submitStrategy.submitAttempt((id, msg) -> submitMessage(packCtx, id, msg));
final boolean timeout = !awaitPackProcessing(packCtx, queue.getPackProcessingTimeout(), true); final boolean timeout = !packCtx.await(queue.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);
TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(queue.getName(), timeout, packCtx); TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(queue.getName(), timeout, packCtx);
if (timeout) { if (timeout) {
@ -310,19 +309,6 @@ public class TbRuleEngineQueueConsumerManager {
} }
} }
private boolean awaitPackProcessing(TbMsgPackProcessingContext packCtx, long processingTimeout, boolean ignoreInterrupt) throws InterruptedException {
try {
return packCtx.await(processingTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (ignoreInterrupt) {
log.debug("Interrupt happened while waiting for pack processing, trying to await one more time");
return awaitPackProcessing(packCtx, processingTimeout, false);
} else {
throw new RuntimeException("Failed to await pack processing due to thread interrupt", e);
}
}
}
private TbRuleEngineSubmitStrategy getSubmitStrategy(Queue queue) { private TbRuleEngineSubmitStrategy getSubmitStrategy(Queue queue) {
return ctx.getSubmitStrategyFactory().newInstance(queue.getName(), queue.getSubmitStrategy()); return ctx.getSubmitStrategyFactory().newInstance(queue.getName(), queue.getSubmitStrategy());
} }