From e59460b42bbcf69289aa2bdf8851e98472e911d1 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Fri, 2 May 2025 15:14:00 +0300 Subject: [PATCH] Task processors partitioning --- .../server/service/job/DefaultJobManager.java | 15 +++-- .../service/job/task/DummyTaskProcessor.java | 4 +- .../src/main/resources/thingsboard.yml | 11 +++- .../server/service/job/JobManagerTest.java | 9 ++- .../server/service/job/TestTaskProcessor.java | 23 ++++++++ .../server/common/msg/queue/ServiceType.java | 3 +- common/proto/src/main/proto/queue.proto | 1 + .../DefaultTbServiceInfoProvider.java | 17 +++++- .../queue/discovery/HashPartitionService.java | 17 +++++- .../server/queue/task/TaskProcessor.java | 47 ++++++++------- .../queue/task/TaskProcessorExecutors.java | 59 +++++++++++++++++++ 11 files changed, 167 insertions(+), 39 deletions(-) create mode 100644 application/src/test/java/org/thingsboard/server/service/job/TestTaskProcessor.java create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessorExecutors.java diff --git a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java index 3da2e2a9ab..79312ba608 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.job.task.TaskResult; import org.thingsboard.server.common.data.notification.info.GeneralNotificationInfo; import org.thingsboard.server.common.data.notification.targets.platform.TenantAdministratorsFilter; import org.thingsboard.server.common.data.notification.template.NotificationTemplate; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.dao.notification.DefaultNotifications; @@ -47,6 +48,7 @@ import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; +import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.task.JobStatsService; import org.thingsboard.server.queue.util.AfterStartUp; @@ -70,20 +72,22 @@ public class DefaultJobManager implements JobManager { private final JobService jobService; private final JobStatsService jobStatsService; private final NotificationCenter notificationCenter; + private final PartitionService partitionService; private final Map jobProcessors; private final Map>> taskProducers; private final QueueConsumerManager> jobStatsConsumer; private final ExecutorService executor; private final ExecutorService consumerExecutor; - @Value("${queue.tasks.stats.processing_interval_ms:5000}") + @Value("${queue.tasks.stats.processing_interval_ms:1000}") private int statsProcessingInterval; public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, NotificationCenter notificationCenter, - TbCoreQueueFactory queueFactory, List jobProcessors) { + PartitionService partitionService, TbCoreQueueFactory queueFactory, List jobProcessors) { this.jobService = jobService; this.jobStatsService = jobStatsService; this.notificationCenter = notificationCenter; + this.partitionService = partitionService; this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity())); this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer)); this.executor = ThingsBoardExecutors.newWorkStealingPool(Math.max(4, Runtime.getRuntime().availableProcessors()), getClass()); @@ -199,8 +203,8 @@ public class DefaultJobManager implements JobManager { .build(); TbQueueProducer> producer = taskProducers.get(task.getJobType()); - TbProtoQueueMsg msg = new TbProtoQueueMsg<>(task.getTenantId().getId(), taskProto); // one job at a time for a given tenant - producer.send(TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build(), msg, new TbQueueCallback() { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TASK_PROCESSOR, task.getJobType().name(), task.getTenantId(), task.getTenantId()); // one job at a time for a given tenant + producer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), taskProto), new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { log.trace("Submitted task: {}", task); @@ -249,7 +253,7 @@ public class DefaultJobManager implements JobManager { private void sendJobFinishedNotification(Job job) { NotificationTemplate template = DefaultNotifications.DefaultNotification.builder() .name("Job finished") - .subject("${type} ${status}") + .subject("${type} task ${status}") .text("${description} ${status}: ${result}") .build().toTemplate(); GeneralNotificationInfo info = new GeneralNotificationInfo(Map.of( @@ -258,6 +262,7 @@ public class DefaultJobManager implements JobManager { "status", job.getStatus().name().toLowerCase(), "result", job.getResult().getDescription() )); + // todo: button to see details (forward to jobs page) notificationCenter.sendGeneralWebNotification(job.getTenantId(), new TenantAdministratorsFilter(), template, info); } diff --git a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java index 5178461746..564142ff2b 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java @@ -16,13 +16,11 @@ package org.thingsboard.server.service.job.task; import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; -import org.thingsboard.server.common.data.job.task.DummyTask; import org.thingsboard.server.common.data.job.JobType; +import org.thingsboard.server.common.data.job.task.DummyTask; import org.thingsboard.server.common.data.job.task.DummyTaskResult; import org.thingsboard.server.queue.task.TaskProcessor; -@Component @RequiredArgsConstructor public class DummyTaskProcessor extends TaskProcessor { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 322b037e2d..7a7e117120 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1628,6 +1628,11 @@ queue: - key: max.poll.records # Max poll records for edqs.state topic value: "${TB_QUEUE_KAFKA_EDQS_STATE_MAX_POLL_RECORDS:512}" + tasks: + # Key-value properties for Kafka consumer for tasks topics + - key: max.poll.records + # Max poll records for tasks topics + value: "${TB_QUEUE_KAFKA_TASKS_MAX_POLL_RECORDS:1}" other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000" other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside # - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms @@ -1668,7 +1673,7 @@ queue: # Kafka properties for EDQS state topic (infinite retention, compaction) edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1;cleanup.policy:compact}" # Kafka properties for tasks topics - tasks: "${TB_QUEUE_KAFKA_TASKS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:104857600;partitions:100;min.insync.replicas:1}" + tasks: "${TB_QUEUE_KAFKA_TASKS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:104857600;partitions:1;min.insync.replicas:1}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" @@ -1884,9 +1889,11 @@ queue: # Statistics printing interval for Edge services print-interval-ms: "${TB_QUEUE_EDGE_STATS_PRINT_INTERVAL_MS:60000}" tasks: + # Partitions count for tasks queues + partitions: "${TB_QUEUE_TASKS_PARTITIONS:12}" stats: # Interval in milliseconds to process job stats - processing_interval_ms: "${TB_QUEUE_TASKS_STATS_PROCESSING_INTERVAL_MS:5000}" + processing_interval_ms: "${TB_QUEUE_TASKS_STATS_PROCESSING_INTERVAL_MS:1000}" # Event configuration parameters event: diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java index 39c87dfd70..796a38ce4f 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java @@ -39,7 +39,6 @@ import org.thingsboard.server.controller.AbstractControllerTest; import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.queue.task.JobStatsService; -import org.thingsboard.server.service.job.task.DummyTaskProcessor; import java.util.ArrayList; import java.util.List; @@ -66,7 +65,7 @@ public class JobManagerTest extends AbstractControllerTest { private JobService jobService; @SpyBean - private DummyTaskProcessor taskProcessor; + private TestTaskProcessor taskProcessor; @SpyBean private JobStatsService jobStatsService; @@ -109,7 +108,7 @@ public class JobManagerTest extends AbstractControllerTest { }); checkJobNotification(notification -> { - assertThat(notification.getSubject()).isEqualTo("Dummy job completed"); + assertThat(notification.getSubject()).isEqualTo("Dummy job task completed"); assertThat(notification.getText()).isEqualTo("Test job completed: 5/5 successful, 0 failed"); }); } @@ -145,7 +144,7 @@ public class JobManagerTest extends AbstractControllerTest { }); checkJobNotification(notification -> { - assertThat(notification.getSubject()).isEqualTo("Dummy job failed"); + assertThat(notification.getSubject()).isEqualTo("Dummy job task failed"); assertThat(notification.getText()).isEqualTo("Test job failed: 3/5 successful, 2 failed"); }); } @@ -340,7 +339,7 @@ public class JobManagerTest extends AbstractControllerTest { }); checkJobNotification(notification -> { - assertThat(notification.getSubject()).isEqualTo("Dummy job failed"); + assertThat(notification.getSubject()).isEqualTo("Dummy job task failed"); assertThat(notification.getText()).isEqualTo("Test job failed: Some error while submitting tasks"); }); } diff --git a/application/src/test/java/org/thingsboard/server/service/job/TestTaskProcessor.java b/application/src/test/java/org/thingsboard/server/service/job/TestTaskProcessor.java new file mode 100644 index 0000000000..fdaec648b5 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/job/TestTaskProcessor.java @@ -0,0 +1,23 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.job; + +import org.springframework.stereotype.Component; +import org.thingsboard.server.service.job.task.DummyTaskProcessor; + +@Component +public class TestTaskProcessor extends DummyTaskProcessor { +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/ServiceType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/ServiceType.java index f31fdfa7a8..8fd535891c 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/ServiceType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/ServiceType.java @@ -27,7 +27,8 @@ public enum ServiceType { TB_TRANSPORT("TB Transport"), JS_EXECUTOR("JS Executor"), TB_VC_EXECUTOR("TB VC Executor"), - EDQS("TB Entity Data Query Service"); + EDQS("TB Entity Data Query Service"), + TASK_PROCESSOR("Task Processor"); private final String label; diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index b4d436a32a..1cc507c5b3 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -88,6 +88,7 @@ message ServiceInfo { SystemInfoProto systemInfo = 10; repeated string assignedTenantProfiles = 11; string label = 12; + repeated string taskTypes = 13; } message SystemInfoProto { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java index 609d3f8eee..4325651540 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java @@ -24,11 +24,13 @@ import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TbTransportService; +import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo; import org.thingsboard.server.queue.edqs.EdqsConfig; +import org.thingsboard.server.queue.task.TaskProcessor; import org.thingsboard.server.queue.util.AfterContextReady; import java.net.InetAddress; @@ -40,7 +42,12 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import static org.thingsboard.common.util.SystemUtil.*; +import static org.thingsboard.common.util.SystemUtil.getCpuCount; +import static org.thingsboard.common.util.SystemUtil.getCpuUsage; +import static org.thingsboard.common.util.SystemUtil.getDiscSpaceUsage; +import static org.thingsboard.common.util.SystemUtil.getMemoryUsage; +import static org.thingsboard.common.util.SystemUtil.getTotalDiscSpace; +import static org.thingsboard.common.util.SystemUtil.getTotalMemory; @Component @@ -65,7 +72,11 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { @Autowired private ApplicationContext applicationContext; + @Autowired + private List> availableTaskProcessors; + private List serviceTypes; + private List taskTypes; private ServiceInfo serviceInfo; @PostConstruct @@ -91,6 +102,9 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { edqsConfig.setLabel(serviceId); } } + taskTypes = availableTaskProcessors.stream() + .map(TaskProcessor::getJobType) + .toList(); generateNewServiceInfoWithCurrentSystemInfo(); } @@ -128,6 +142,7 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { builder.addAllAssignedTenantProfiles(assignedTenantProfiles.stream().map(UUID::toString).collect(Collectors.toList())); } builder.setLabel(edqsConfig.getLabel()); + builder.addAllTaskTypes(taskTypes.stream().map(JobType::name).toList()); return serviceInfo = builder.build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 7186bf7055..ccbfbc9c79 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.exception.TenantNotFoundException; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; +import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -82,6 +83,8 @@ public class HashPartitionService implements PartitionService { private Integer edgePartitions; @Value("${queue.edqs.partitions:12}") private Integer edqsPartitions; + @Value("${queue.tasks.partitions:12}") + private Integer tasksPartitions; @Value("${queue.partitions.hash_function_name:murmur3_128}") private String hashFunctionName; @@ -140,6 +143,12 @@ public class HashPartitionService implements PartitionService { QueueKey edqsKey = new QueueKey(ServiceType.EDQS); partitionSizesMap.put(edqsKey, edqsPartitions); partitionTopicsMap.put(edqsKey, "edqs"); // placeholder, not used + + for (JobType jobType : JobType.values()) { + QueueKey queueKey = new QueueKey(ServiceType.TASK_PROCESSOR, jobType.name()); + partitionSizesMap.put(queueKey, tasksPartitions); + partitionTopicsMap.put(queueKey, jobType.getTasksTopic()); + } } @AfterStartUp(order = AfterStartUp.QUEUE_INFO_INITIALIZATION) @@ -454,8 +463,8 @@ public class HashPartitionService implements PartitionService { if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) { partitionSizesMap.keySet().stream() .filter(queueKey -> queueKey.getType() == ServiceType.TB_RULE_ENGINE && - !queueKey.getTenantId().isSysTenantId() && - !newPartitions.containsKey(queueKey)) + !queueKey.getTenantId().isSysTenantId() && + !newPartitions.containsKey(queueKey)) .forEach(removed::add); } removed.forEach(queueKey -> { @@ -675,6 +684,10 @@ public class HashPartitionService implements PartitionService { for (String transportType : instance.getTransportsList()) { tbTransportServicesByType.computeIfAbsent(transportType, t -> new ArrayList<>()).add(instance); } + for (String taskType : instance.getTaskTypesList()) { + QueueKey queueKey = new QueueKey(ServiceType.TASK_PROCESSOR, taskType); + queueServiceList.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(instance); + } } @NotNull diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index 643e7b27bc..82f13ce66b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -22,26 +22,27 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.task.Task; import org.thingsboard.server.common.data.job.task.TaskResult; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; +import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; +import org.thingsboard.server.queue.discovery.QueueKey; +import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.provider.TaskProcessorQueueFactory; -import org.thingsboard.server.queue.util.AfterStartUp; import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; public abstract class TaskProcessor, R extends TaskResult> { @@ -51,29 +52,35 @@ public abstract class TaskProcessor, R extends TaskResult> { private TaskProcessorQueueFactory queueFactory; @Autowired private JobStatsService statsService; + @Autowired + private TaskProcessorExecutors executors; - private QueueConsumerManager> taskConsumer; - private ExecutorService consumerExecutor; + private QueueKey queueKey; + private MainQueueConsumerManager, QueueConfig> taskConsumer; private final Set deletedTenants = ConcurrentHashMap.newKeySet(); private final Set discardedJobs = ConcurrentHashMap.newKeySet(); // fixme use caffeine @PostConstruct public void init() { - consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getJobType().name().toLowerCase() + "-task-consumer")); - taskConsumer = QueueConsumerManager.>builder() // fixme: should be consumer per partition - .name(getJobType().name().toLowerCase() + "-tasks") - .msgPackProcessor(this::processMsgs) // todo: max.poll.records = 1 - .pollInterval(125) - .consumerCreator(() -> queueFactory.createTaskConsumer(getJobType())) - .consumerExecutor(consumerExecutor) + queueKey = new QueueKey(ServiceType.TASK_PROCESSOR, getJobType().name()); + taskConsumer = MainQueueConsumerManager., QueueConfig>builder() + .queueKey(queueKey) + .config(QueueConfig.of(true, 500)) + .msgPackProcessor(this::processMsgs) + .consumerCreator((queueConfig, tpi) -> queueFactory.createTaskConsumer(getJobType())) + .consumerExecutor(executors.getConsumersExecutor()) + .scheduler(executors.getScheduler()) + .taskExecutor(executors.getMgmtExecutor()) .build(); } - @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) - public void afterStartUp() { - taskConsumer.subscribe(); - taskConsumer.launch(); + @EventListener + public void onPartitionChangeEvent(PartitionChangeEvent event) { + if (event.getServiceType() == ServiceType.TASK_PROCESSOR) { + Set partitions = event.getNewPartitions().get(queueKey); + taskConsumer.update(partitions); + } } @EventListener @@ -95,7 +102,7 @@ public abstract class TaskProcessor, R extends TaskResult> { } } - private void processMsgs(List> msgs, TbQueueConsumer> consumer) throws Exception { + private void processMsgs(List> msgs, TbQueueConsumer> consumer, QueueConfig queueConfig) throws Exception { for (TbProtoQueueMsg msg : msgs) { try { @SuppressWarnings("unchecked") @@ -159,7 +166,7 @@ public abstract class TaskProcessor, R extends TaskResult> { @PreDestroy public void destroy() { taskConsumer.stop(); - consumerExecutor.shutdownNow(); + taskConsumer.awaitStop(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessorExecutors.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessorExecutors.java new file mode 100644 index 0000000000..3aa6a0f004 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessorExecutors.java @@ -0,0 +1,59 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.task; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Getter; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.common.util.ThingsBoardThreadFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +@Getter +@Lazy +@Component +public class TaskProcessorExecutors { + + private ExecutorService consumersExecutor; + private ExecutorService mgmtExecutor; + private ScheduledExecutorService scheduler; + + @PostConstruct + private void init() { + consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("task-consumer")); + mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "task-consumer-mgmt"); + scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("task-consumer-scheduler"); + } + + @PreDestroy + private void destroy() { + if (consumersExecutor != null) { + consumersExecutor.shutdownNow(); + } + if (mgmtExecutor != null) { + mgmtExecutor.shutdownNow(); + } + if (scheduler != null) { + scheduler.shutdownNow(); + } + } + +}