rule-engine - fixed name for reusable threads

This commit is contained in:
Sergey Matvienko 2021-06-15 15:22:30 +03:00 committed by Sergey Matvienko
parent 369844295f
commit 0d94b6231e

View File

@ -82,6 +82,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
public static final String SUCCESSFUL_STATUS = "successful"; public static final String SUCCESSFUL_STATUS = "successful";
public static final String FAILED_STATUS = "failed"; public static final String FAILED_STATUS = "failed";
public static final String THREAD_TOPIC_SPLITERATOR = " | ";
@Value("${queue.rule-engine.poll-interval}") @Value("${queue.rule-engine.poll-interval}")
private long pollDuration; private long pollDuration;
@Value("${queue.rule-engine.pack-processing-timeout}") @Value("${queue.rule-engine.pack-processing-timeout}")
@ -245,7 +246,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
} }
void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
Thread.currentThread().setName("" + Thread.currentThread().getName() + "-" + threadSuffix); updateCurrentThreadName(threadSuffix);
while (!stopped && !consumer.isStopped()) { while (!stopped && !consumer.isStopped()) {
try { try {
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(pollDuration); List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(pollDuration);
@ -299,6 +300,16 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
log.info("TB Rule Engine Consumer stopped."); log.info("TB Rule Engine Consumer stopped.");
} }
void updateCurrentThreadName(String threadSuffix) {
String name = Thread.currentThread().getName();
int spliteratorIndex = name.indexOf(THREAD_TOPIC_SPLITERATOR);
if (spliteratorIndex > 0) {
name = name.substring(0, spliteratorIndex);
}
name = name + THREAD_TOPIC_SPLITERATOR + threadSuffix;
Thread.currentThread().setName(name);
}
TbRuleEngineProcessingStrategy getAckStrategy(TbRuleEngineQueueConfiguration configuration) { TbRuleEngineProcessingStrategy getAckStrategy(TbRuleEngineQueueConfiguration configuration) {
return processingStrategyFactory.newInstance(configuration.getName(), configuration.getProcessingStrategy()); return processingStrategyFactory.newInstance(configuration.getName(), configuration.getProcessingStrategy());
} }