Configurable tasks poll interval; task processing timing

This commit is contained in:
ViacheslavKlimov 2025-05-08 16:49:33 +03:00
parent f5e816923d
commit 8c959232d4
2 changed files with 13 additions and 4 deletions

View File

@ -1898,6 +1898,8 @@ queue:
# Statistics printing interval for Edge services
print-interval-ms: "${TB_QUEUE_EDGE_STATS_PRINT_INTERVAL_MS:60000}"
tasks:
# Poll interval in milliseconds for tasks topics
poll_interval: "${TB_QUEUE_TASKS_POLL_INTERVAL_MS:500}"
# Partitions count for tasks queues
partitions: "${TB_QUEUE_TASKS_PARTITIONS:12}"
# Custom partitions count for tasks queues per type. Format: 'TYPE1:24;TYPE2:36', e.g. 'CF_REPROCESSING:24;TENANT_EXPORT:6'

View File

@ -20,6 +20,7 @@ import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.SetCache;
@ -61,6 +62,9 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
@Autowired
private TaskProcessorExecutors executors;
@Value("${queue.tasks.poll_interval:500}")
private int pollInterval;
private QueueKey queueKey;
private MainQueueConsumerManager<TbProtoQueueMsg<TaskProto>, QueueConfig> taskConsumer;
private final ExecutorService taskExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getJobType().name().toLowerCase() + "-task-processor"));
@ -73,7 +77,7 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
queueKey = new QueueKey(ServiceType.TASK_PROCESSOR, getJobType().name());
taskConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<TaskProto>, QueueConfig>builder()
.queueKey(queueKey)
.config(QueueConfig.of(true, 500))
.config(QueueConfig.of(true, pollInterval))
.msgPackProcessor(this::processMsgs)
.consumerCreator((queueConfig, tpi) -> queueFactory.createTaskConsumer(getJobType()))
.consumerExecutor(executors.getConsumersExecutor())
@ -96,14 +100,14 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
switch (entityId.getEntityType()) {
case JOB -> {
if (event.getEvent() == ComponentLifecycleEvent.STOPPED) {
log.debug("Adding job {} to discarded", entityId);
log.info("Adding job {} to discarded", entityId);
addToDiscardedJobs(entityId.getId());
}
}
case TENANT -> {
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
deletedTenants.add(entityId.getId());
log.debug("Adding tenant {} to deleted", entityId);
log.info("Adding tenant {} to deleted", entityId);
}
}
}
@ -134,9 +138,10 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
private void processTask(T task) throws InterruptedException {
task.setAttempt(task.getAttempt() + 1);
log.info("Processing task: {}", task);
log.debug("Processing task: {}", task);
Future<R> future = null;
try {
long startNs = System.nanoTime();
future = taskExecutor.submit(() -> process(task));
R result;
try {
@ -146,6 +151,8 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
} catch (TimeoutException e) {
throw new TimeoutException("Timeout after " + getTaskProcessingTimeout() + " ms");
}
long timingNs = System.nanoTime() - startNs;
log.info("Processed task in {} ms: {}", timingNs / 1000000.0, task);
reportTaskResult(task, result);
} catch (InterruptedException e) {
throw e;