Task processors partitioning

This commit is contained in:
ViacheslavKlimov 2025-05-02 15:14:00 +03:00
parent 09e334660f
commit e59460b42b
11 changed files with 167 additions and 39 deletions

View File

@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.job.task.TaskResult;
import org.thingsboard.server.common.data.notification.info.GeneralNotificationInfo;
import org.thingsboard.server.common.data.notification.targets.platform.TenantAdministratorsFilter;
import org.thingsboard.server.common.data.notification.template.NotificationTemplate;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.dao.notification.DefaultNotifications;
@ -47,6 +48,7 @@ import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.task.JobStatsService;
import org.thingsboard.server.queue.util.AfterStartUp;
@ -70,20 +72,22 @@ public class DefaultJobManager implements JobManager {
private final JobService jobService;
private final JobStatsService jobStatsService;
private final NotificationCenter notificationCenter;
private final PartitionService partitionService;
private final Map<JobType, JobProcessor> jobProcessors;
private final Map<JobType, TbQueueProducer<TbProtoQueueMsg<TaskProto>>> taskProducers;
private final QueueConsumerManager<TbProtoQueueMsg<JobStatsMsg>> jobStatsConsumer;
private final ExecutorService executor;
private final ExecutorService consumerExecutor;
@Value("${queue.tasks.stats.processing_interval_ms:5000}")
@Value("${queue.tasks.stats.processing_interval_ms:1000}")
private int statsProcessingInterval;
public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, NotificationCenter notificationCenter,
TbCoreQueueFactory queueFactory, List<JobProcessor> jobProcessors) {
PartitionService partitionService, TbCoreQueueFactory queueFactory, List<JobProcessor> jobProcessors) {
this.jobService = jobService;
this.jobStatsService = jobStatsService;
this.notificationCenter = notificationCenter;
this.partitionService = partitionService;
this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity()));
this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer));
this.executor = ThingsBoardExecutors.newWorkStealingPool(Math.max(4, Runtime.getRuntime().availableProcessors()), getClass());
@ -199,8 +203,8 @@ public class DefaultJobManager implements JobManager {
.build();
TbQueueProducer<TbProtoQueueMsg<TaskProto>> producer = taskProducers.get(task.getJobType());
TbProtoQueueMsg<TaskProto> msg = new TbProtoQueueMsg<>(task.getTenantId().getId(), taskProto); // one job at a time for a given tenant
producer.send(TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build(), msg, new TbQueueCallback() {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TASK_PROCESSOR, task.getJobType().name(), task.getTenantId(), task.getTenantId()); // one job at a time for a given tenant
producer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), taskProto), new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.trace("Submitted task: {}", task);
@ -249,7 +253,7 @@ public class DefaultJobManager implements JobManager {
private void sendJobFinishedNotification(Job job) {
NotificationTemplate template = DefaultNotifications.DefaultNotification.builder()
.name("Job finished")
.subject("${type} ${status}")
.subject("${type} task ${status}")
.text("${description} ${status}: ${result}")
.build().toTemplate();
GeneralNotificationInfo info = new GeneralNotificationInfo(Map.of(
@ -258,6 +262,7 @@ public class DefaultJobManager implements JobManager {
"status", job.getStatus().name().toLowerCase(),
"result", job.getResult().getDescription()
));
// todo: button to see details (forward to jobs page)
notificationCenter.sendGeneralWebNotification(job.getTenantId(), new TenantAdministratorsFilter(), template, info);
}

View File

@ -16,13 +16,11 @@
package org.thingsboard.server.service.job.task;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.job.task.DummyTask;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.task.DummyTask;
import org.thingsboard.server.common.data.job.task.DummyTaskResult;
import org.thingsboard.server.queue.task.TaskProcessor;
@Component
@RequiredArgsConstructor
public class DummyTaskProcessor extends TaskProcessor<DummyTask, DummyTaskResult> {

View File

@ -1628,6 +1628,11 @@ queue:
- key: max.poll.records
# Max poll records for edqs.state topic
value: "${TB_QUEUE_KAFKA_EDQS_STATE_MAX_POLL_RECORDS:512}"
tasks:
# Key-value properties for Kafka consumer for tasks topics
- key: max.poll.records
# Max poll records for tasks topics
value: "${TB_QUEUE_KAFKA_TASKS_MAX_POLL_RECORDS:1}"
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
@ -1668,7 +1673,7 @@ queue:
# Kafka properties for EDQS state topic (infinite retention, compaction)
edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1;cleanup.policy:compact}"
# Kafka properties for tasks topics
tasks: "${TB_QUEUE_KAFKA_TASKS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:104857600;partitions:100;min.insync.replicas:1}"
tasks: "${TB_QUEUE_KAFKA_TASKS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:104857600;partitions:1;min.insync.replicas:1}"
consumer-stats:
# Prints lag between consumer group offset and last messages offset in Kafka topics
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
@ -1884,9 +1889,11 @@ queue:
# Statistics printing interval for Edge services
print-interval-ms: "${TB_QUEUE_EDGE_STATS_PRINT_INTERVAL_MS:60000}"
tasks:
# Partitions count for tasks queues
partitions: "${TB_QUEUE_TASKS_PARTITIONS:12}"
stats:
# Interval in milliseconds to process job stats
processing_interval_ms: "${TB_QUEUE_TASKS_STATS_PROCESSING_INTERVAL_MS:5000}"
processing_interval_ms: "${TB_QUEUE_TASKS_STATS_PROCESSING_INTERVAL_MS:1000}"
# Event configuration parameters
event:

View File

@ -39,7 +39,6 @@ import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.queue.task.JobStatsService;
import org.thingsboard.server.service.job.task.DummyTaskProcessor;
import java.util.ArrayList;
import java.util.List;
@ -66,7 +65,7 @@ public class JobManagerTest extends AbstractControllerTest {
private JobService jobService;
@SpyBean
private DummyTaskProcessor taskProcessor;
private TestTaskProcessor taskProcessor;
@SpyBean
private JobStatsService jobStatsService;
@ -109,7 +108,7 @@ public class JobManagerTest extends AbstractControllerTest {
});
checkJobNotification(notification -> {
assertThat(notification.getSubject()).isEqualTo("Dummy job completed");
assertThat(notification.getSubject()).isEqualTo("Dummy job task completed");
assertThat(notification.getText()).isEqualTo("Test job completed: 5/5 successful, 0 failed");
});
}
@ -145,7 +144,7 @@ public class JobManagerTest extends AbstractControllerTest {
});
checkJobNotification(notification -> {
assertThat(notification.getSubject()).isEqualTo("Dummy job failed");
assertThat(notification.getSubject()).isEqualTo("Dummy job task failed");
assertThat(notification.getText()).isEqualTo("Test job failed: 3/5 successful, 2 failed");
});
}
@ -340,7 +339,7 @@ public class JobManagerTest extends AbstractControllerTest {
});
checkJobNotification(notification -> {
assertThat(notification.getSubject()).isEqualTo("Dummy job failed");
assertThat(notification.getSubject()).isEqualTo("Dummy job task failed");
assertThat(notification.getText()).isEqualTo("Test job failed: Some error while submitting tasks");
});
}

View File

@ -0,0 +1,23 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.job;
import org.springframework.stereotype.Component;
import org.thingsboard.server.service.job.task.DummyTaskProcessor;
@Component
public class TestTaskProcessor extends DummyTaskProcessor {
}

View File

@ -27,7 +27,8 @@ public enum ServiceType {
TB_TRANSPORT("TB Transport"),
JS_EXECUTOR("JS Executor"),
TB_VC_EXECUTOR("TB VC Executor"),
EDQS("TB Entity Data Query Service");
EDQS("TB Entity Data Query Service"),
TASK_PROCESSOR("Task Processor");
private final String label;

View File

@ -88,6 +88,7 @@ message ServiceInfo {
SystemInfoProto systemInfo = 10;
repeated string assignedTenantProfiles = 11;
string label = 12;
repeated string taskTypes = 13;
}
message SystemInfoProto {

View File

@ -24,11 +24,13 @@ import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TbTransportService;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.task.TaskProcessor;
import org.thingsboard.server.queue.util.AfterContextReady;
import java.net.InetAddress;
@ -40,7 +42,12 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.thingsboard.common.util.SystemUtil.*;
import static org.thingsboard.common.util.SystemUtil.getCpuCount;
import static org.thingsboard.common.util.SystemUtil.getCpuUsage;
import static org.thingsboard.common.util.SystemUtil.getDiscSpaceUsage;
import static org.thingsboard.common.util.SystemUtil.getMemoryUsage;
import static org.thingsboard.common.util.SystemUtil.getTotalDiscSpace;
import static org.thingsboard.common.util.SystemUtil.getTotalMemory;
@Component
@ -65,7 +72,11 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private List<TaskProcessor<?, ?>> availableTaskProcessors;
private List<ServiceType> serviceTypes;
private List<JobType> taskTypes;
private ServiceInfo serviceInfo;
@PostConstruct
@ -91,6 +102,9 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
edqsConfig.setLabel(serviceId);
}
}
taskTypes = availableTaskProcessors.stream()
.map(TaskProcessor::getJobType)
.toList();
generateNewServiceInfoWithCurrentSystemInfo();
}
@ -128,6 +142,7 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
builder.addAllAssignedTenantProfiles(assignedTenantProfiles.stream().map(UUID::toString).collect(Collectors.toList()));
}
builder.setLabel(edqsConfig.getLabel());
builder.addAllTaskTypes(taskTypes.stream().map(JobType::name).toList());
return serviceInfo = builder.build();
}

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.exception.TenantNotFoundException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@ -82,6 +83,8 @@ public class HashPartitionService implements PartitionService {
private Integer edgePartitions;
@Value("${queue.edqs.partitions:12}")
private Integer edqsPartitions;
@Value("${queue.tasks.partitions:12}")
private Integer tasksPartitions;
@Value("${queue.partitions.hash_function_name:murmur3_128}")
private String hashFunctionName;
@ -140,6 +143,12 @@ public class HashPartitionService implements PartitionService {
QueueKey edqsKey = new QueueKey(ServiceType.EDQS);
partitionSizesMap.put(edqsKey, edqsPartitions);
partitionTopicsMap.put(edqsKey, "edqs"); // placeholder, not used
for (JobType jobType : JobType.values()) {
QueueKey queueKey = new QueueKey(ServiceType.TASK_PROCESSOR, jobType.name());
partitionSizesMap.put(queueKey, tasksPartitions);
partitionTopicsMap.put(queueKey, jobType.getTasksTopic());
}
}
@AfterStartUp(order = AfterStartUp.QUEUE_INFO_INITIALIZATION)
@ -454,8 +463,8 @@ public class HashPartitionService implements PartitionService {
if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) {
partitionSizesMap.keySet().stream()
.filter(queueKey -> queueKey.getType() == ServiceType.TB_RULE_ENGINE &&
!queueKey.getTenantId().isSysTenantId() &&
!newPartitions.containsKey(queueKey))
!queueKey.getTenantId().isSysTenantId() &&
!newPartitions.containsKey(queueKey))
.forEach(removed::add);
}
removed.forEach(queueKey -> {
@ -675,6 +684,10 @@ public class HashPartitionService implements PartitionService {
for (String transportType : instance.getTransportsList()) {
tbTransportServicesByType.computeIfAbsent(transportType, t -> new ArrayList<>()).add(instance);
}
for (String taskType : instance.getTaskTypesList()) {
QueueKey queueKey = new QueueKey(ServiceType.TASK_PROCESSOR, taskType);
queueServiceList.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(instance);
}
}
@NotNull

View File

@ -22,26 +22,27 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.task.Task;
import org.thingsboard.server.common.data.job.task.TaskResult;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TaskProcessorQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
@ -51,29 +52,35 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
private TaskProcessorQueueFactory queueFactory;
@Autowired
private JobStatsService statsService;
@Autowired
private TaskProcessorExecutors executors;
private QueueConsumerManager<TbProtoQueueMsg<TaskProto>> taskConsumer;
private ExecutorService consumerExecutor;
private QueueKey queueKey;
private MainQueueConsumerManager<TbProtoQueueMsg<TaskProto>, QueueConfig> taskConsumer;
private final Set<UUID> deletedTenants = ConcurrentHashMap.newKeySet();
private final Set<UUID> discardedJobs = ConcurrentHashMap.newKeySet(); // fixme use caffeine
@PostConstruct
public void init() {
consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getJobType().name().toLowerCase() + "-task-consumer"));
taskConsumer = QueueConsumerManager.<TbProtoQueueMsg<TaskProto>>builder() // fixme: should be consumer per partition
.name(getJobType().name().toLowerCase() + "-tasks")
.msgPackProcessor(this::processMsgs) // todo: max.poll.records = 1
.pollInterval(125)
.consumerCreator(() -> queueFactory.createTaskConsumer(getJobType()))
.consumerExecutor(consumerExecutor)
queueKey = new QueueKey(ServiceType.TASK_PROCESSOR, getJobType().name());
taskConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<TaskProto>, QueueConfig>builder()
.queueKey(queueKey)
.config(QueueConfig.of(true, 500))
.msgPackProcessor(this::processMsgs)
.consumerCreator((queueConfig, tpi) -> queueFactory.createTaskConsumer(getJobType()))
.consumerExecutor(executors.getConsumersExecutor())
.scheduler(executors.getScheduler())
.taskExecutor(executors.getMgmtExecutor())
.build();
}
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void afterStartUp() {
taskConsumer.subscribe();
taskConsumer.launch();
@EventListener
public void onPartitionChangeEvent(PartitionChangeEvent event) {
if (event.getServiceType() == ServiceType.TASK_PROCESSOR) {
Set<TopicPartitionInfo> partitions = event.getNewPartitions().get(queueKey);
taskConsumer.update(partitions);
}
}
@EventListener
@ -95,7 +102,7 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
}
}
private void processMsgs(List<TbProtoQueueMsg<TaskProto>> msgs, TbQueueConsumer<TbProtoQueueMsg<TaskProto>> consumer) throws Exception {
private void processMsgs(List<TbProtoQueueMsg<TaskProto>> msgs, TbQueueConsumer<TbProtoQueueMsg<TaskProto>> consumer, QueueConfig queueConfig) throws Exception {
for (TbProtoQueueMsg<TaskProto> msg : msgs) {
try {
@SuppressWarnings("unchecked")
@ -159,7 +166,7 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
@PreDestroy
public void destroy() {
taskConsumer.stop();
consumerExecutor.shutdownNow();
taskConsumer.awaitStop();
}

View File

@ -0,0 +1,59 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.task;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Getter
@Lazy
@Component
public class TaskProcessorExecutors {
private ExecutorService consumersExecutor;
private ExecutorService mgmtExecutor;
private ScheduledExecutorService scheduler;
@PostConstruct
private void init() {
consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("task-consumer"));
mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "task-consumer-mgmt");
scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("task-consumer-scheduler");
}
@PreDestroy
private void destroy() {
if (consumersExecutor != null) {
consumersExecutor.shutdownNow();
}
if (mgmtExecutor != null) {
mgmtExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
}
}