Minor renaming of variables and code cleanup

This commit is contained in:
Andrii Shvaika 2021-07-06 13:50:26 +03:00
parent 026329f812
commit 25cac25731
2 changed files with 15 additions and 18 deletions

View File

@ -82,7 +82,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
public static final String SUCCESSFUL_STATUS = "successful";
public static final String FAILED_STATUS = "failed";
public static final String THREAD_TOPIC_SPLITERATOR = " | ";
public static final String THREAD_TOPIC_SEPARATOR = " | ";
@Value("${queue.rule-engine.poll-interval}")
private long pollDuration;
@Value("${queue.rule-engine.pack-processing-timeout}")
@ -145,9 +145,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
@PreDestroy
public void stop() {
super.destroy();
if (submitExecutor != null) {
submitExecutor.shutdownNow();
}
ruleEngineSettings.getQueues().forEach(config -> consumerConfigurations.put(config.getName(), config));
}
@ -174,11 +172,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
repartitionExecutor.schedule(() -> repartitionTopicWithConsumerPerPartition(queue), 1, TimeUnit.SECONDS);
}
void repartitionTopicWithConsumerPerPartition(final String queue) {
void repartitionTopicWithConsumerPerPartition(final String queueName) {
if (stopped) {
return;
}
TbTopicWithConsumerPerPartition tbTopicWithConsumerPerPartition = topicsConsumerPerPartition.get(queue);
TbTopicWithConsumerPerPartition tbTopicWithConsumerPerPartition = topicsConsumerPerPartition.get(queueName);
Queue<Set<TopicPartitionInfo>> subscribeQueue = tbTopicWithConsumerPerPartition.getSubscribeQueue();
if (subscribeQueue.isEmpty()) {
return;
@ -203,16 +201,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
log.info("calculated removedPartitions {}", removedPartitions);
removedPartitions.forEach((tpi) -> {
removeConsumerForTopicByTpi(queue, consumers, tpi);
removeConsumerForTopicByTpi(queueName, consumers, tpi);
});
addedPartitions.forEach((tpi) -> {
log.info("[{}] Adding consumer for topic: {}", queue, tpi);
TbRuleEngineQueueConfiguration configuration = consumerConfigurations.get(queue);
//consumerStats.computeIfAbsent(queue, queueName -> new TbRuleEngineConsumerStats(configuration.getName(), statsFactory)); //already created on init
log.info("[{}] Adding consumer for topic: {}", queueName, tpi);
TbRuleEngineQueueConfiguration configuration = consumerConfigurations.get(queueName);
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer = tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration);
consumers.put(tpi, consumer);
launchConsumer(consumer, consumerConfigurations.get(queue), consumerStats.get(queue), "" + queue + "-" + tpi.getPartition().orElse(-999999));
launchConsumer(consumer, consumerConfigurations.get(queueName), consumerStats.get(queueName), "" + queueName + "-" + tpi.getPartition().orElse(-999999));
consumer.subscribe(Collections.singleton(tpi));
});
@ -220,7 +217,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
tbTopicWithConsumerPerPartition.getLock().unlock();
}
} else {
scheduleTopicRepartition(queue); //reschedule later
scheduleTopicRepartition(queueName); //reschedule later
}
}
@ -240,7 +237,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
protected void stopMainConsumers() {
consumers.values().forEach(TbQueueConsumer::unsubscribe);
topicsConsumerPerPartition.values().forEach(tbTopicWithConsumerPerPartition -> tbTopicWithConsumerPerPartition.getConsumers().keySet()
.forEach((tpi)-> removeConsumerForTopicByTpi(tbTopicWithConsumerPerPartition.getTopic(), tbTopicWithConsumerPerPartition.getConsumers(), tpi)));
.forEach((tpi) -> removeConsumerForTopicByTpi(tbTopicWithConsumerPerPartition.getTopic(), tbTopicWithConsumerPerPartition.getConsumers(), tpi)));
}
void launchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
@ -304,11 +301,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
void updateCurrentThreadName(String threadSuffix) {
String name = Thread.currentThread().getName();
int spliteratorIndex = name.indexOf(THREAD_TOPIC_SPLITERATOR);
int spliteratorIndex = name.indexOf(THREAD_TOPIC_SEPARATOR);
if (spliteratorIndex > 0) {
name = name.substring(0, spliteratorIndex);
}
name = name + THREAD_TOPIC_SPLITERATOR + threadSuffix;
name = name + THREAD_TOPIC_SEPARATOR + threadSuffix;
Thread.currentThread().setName(name);
}

View File

@ -866,7 +866,7 @@ queue:
topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}"
poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}"
consumer-per-partition: "${TB_QUEUE_RE_MAIN_CONSUMER_PER_PARTITION:false}"
consumer-per-partition: "${TB_QUEUE_RE_MAIN_CONSUMER_PER_PARTITION:true}"
pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:2000}"
submit-strategy:
type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
@ -883,7 +883,7 @@ queue:
topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}"
poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_HP_PARTITIONS:10}"
consumer-per-partition: "${TB_QUEUE_RE_HP_CONSUMER_PER_PARTITION:false}"
consumer-per-partition: "${TB_QUEUE_RE_HP_CONSUMER_PER_PARTITION:true}"
pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:2000}"
submit-strategy:
type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
@ -900,7 +900,7 @@ queue:
topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}"
poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_SQ_PARTITIONS:10}"
consumer-per-partition: "${TB_QUEUE_RE_SQ_CONSUMER_PER_PARTITION:false}"
consumer-per-partition: "${TB_QUEUE_RE_SQ_CONSUMER_PER_PARTITION:true}"
pack-processing-timeout: "${TB_QUEUE_RE_SQ_PACK_PROCESSING_TIMEOUT_MS:2000}"
submit-strategy:
type: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_BY_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL