Jobs: refactoring

This commit is contained in:
ViacheslavKlimov 2025-05-13 14:48:46 +03:00
parent 7d0b6bfdec
commit 4fbb6c2e71
13 changed files with 55 additions and 28 deletions

View File

@ -18,7 +18,6 @@ package org.thingsboard.server.service.job;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardExecutors;
@ -51,6 +50,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.settings.TasksQueueConfig;
import org.thingsboard.server.queue.task.JobStatsService; import org.thingsboard.server.queue.task.JobStatsService;
import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
@ -74,23 +74,21 @@ public class DefaultJobManager implements JobManager {
private final JobStatsService jobStatsService; private final JobStatsService jobStatsService;
private final NotificationCenter notificationCenter; private final NotificationCenter notificationCenter;
private final PartitionService partitionService; private final PartitionService partitionService;
private final TasksQueueConfig queueConfig;
private final Map<JobType, JobProcessor> jobProcessors; private final Map<JobType, JobProcessor> jobProcessors;
private final Map<JobType, TbQueueProducer<TbProtoQueueMsg<TaskProto>>> taskProducers; private final Map<JobType, TbQueueProducer<TbProtoQueueMsg<TaskProto>>> taskProducers;
private final QueueConsumerManager<TbProtoQueueMsg<JobStatsMsg>> jobStatsConsumer; private final QueueConsumerManager<TbProtoQueueMsg<JobStatsMsg>> jobStatsConsumer;
private final ExecutorService executor; private final ExecutorService executor;
private final ExecutorService consumerExecutor; private final ExecutorService consumerExecutor;
@Value("${queue.tasks.partitioning_strategy:tenant}")
private String tasksPartitioningStrategy;
@Value("${queue.tasks.stats.processing_interval_ms:1000}")
private int statsProcessingInterval;
public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, NotificationCenter notificationCenter, public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, NotificationCenter notificationCenter,
PartitionService partitionService, TbCoreQueueFactory queueFactory, List<JobProcessor> jobProcessors) { PartitionService partitionService, TbCoreQueueFactory queueFactory, TasksQueueConfig queueConfig,
List<JobProcessor> jobProcessors) {
this.jobService = jobService; this.jobService = jobService;
this.jobStatsService = jobStatsService; this.jobStatsService = jobStatsService;
this.notificationCenter = notificationCenter; this.notificationCenter = notificationCenter;
this.partitionService = partitionService; this.partitionService = partitionService;
this.queueConfig = queueConfig;
this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity())); this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity()));
this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer)); this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer));
this.executor = ThingsBoardExecutors.newWorkStealingPool(Math.max(4, Runtime.getRuntime().availableProcessors()), getClass()); this.executor = ThingsBoardExecutors.newWorkStealingPool(Math.max(4, Runtime.getRuntime().availableProcessors()), getClass());
@ -98,7 +96,7 @@ public class DefaultJobManager implements JobManager {
this.jobStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<JobStatsMsg>>builder() this.jobStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<JobStatsMsg>>builder()
.name("job-stats") .name("job-stats")
.msgPackProcessor(this::processStats) .msgPackProcessor(this::processStats)
.pollInterval(125) .pollInterval(queueConfig.getStatsPollInterval())
.consumerCreator(queueFactory::createJobStatsConsumer) .consumerCreator(queueFactory::createJobStatsConsumer)
.consumerExecutor(consumerExecutor) .consumerExecutor(consumerExecutor)
.build(); .build();
@ -113,7 +111,7 @@ public class DefaultJobManager implements JobManager {
@Override @Override
public Job submitJob(Job job) { public Job submitJob(Job job) {
log.debug("Submitting job: {}", job); log.debug("Submitting job: {}", job);
return jobService.submitJob(job.getTenantId(), job); return jobService.saveJob(job.getTenantId(), job);
} }
@Override @Override
@ -196,7 +194,7 @@ public class DefaultJobManager implements JobManager {
job.getConfiguration().setToReprocess(taskFailures); job.getConfiguration().setToReprocess(taskFailures);
jobService.submitJob(tenantId, job); jobService.saveJob(tenantId, job);
} }
private void submitTask(Task<?> task) { private void submitTask(Task<?> task) {
@ -207,7 +205,7 @@ public class DefaultJobManager implements JobManager {
TbQueueProducer<TbProtoQueueMsg<TaskProto>> producer = taskProducers.get(task.getJobType()); TbQueueProducer<TbProtoQueueMsg<TaskProto>> producer = taskProducers.get(task.getJobType());
EntityId entityId = null; EntityId entityId = null;
if (tasksPartitioningStrategy.equals("entity")) { if (queueConfig.getPartitioningStrategy().equals("entity")) {
entityId = task.getEntityId(); entityId = task.getEntityId();
} }
if (entityId == null) { if (entityId == null) {
@ -257,7 +255,7 @@ public class DefaultJobManager implements JobManager {
}); });
consumer.commit(); consumer.commit();
Thread.sleep(statsProcessingInterval); Thread.sleep(queueConfig.getStatsProcessingInterval());
} }
private void sendJobFinishedNotification(Job job) { private void sendJobFinishedNotification(Job job) {

View File

@ -1908,9 +1908,12 @@ queue:
# In a single-tenant environment, use 'entity' strategy to distribute the tasks among multiple partitions. # In a single-tenant environment, use 'entity' strategy to distribute the tasks among multiple partitions.
partitioning_strategy: "${TB_QUEUE_TASKS_PARTITIONING_STRATEGY:tenant}" partitioning_strategy: "${TB_QUEUE_TASKS_PARTITIONING_STRATEGY:tenant}"
stats: stats:
# Name for the tasks stats topic
topic: "${TB_QUEUE_TASKS_STATS_TOPIC:jobs.stats}" topic: "${TB_QUEUE_TASKS_STATS_TOPIC:jobs.stats}"
# Poll interval in milliseconds for tasks stats topic
poll_interval: "${TB_QUEUE_TASKS_STATS_POLL_INTERVAL_MS:500}"
# Interval in milliseconds to process job stats # Interval in milliseconds to process job stats
processing_interval_ms: "${TB_QUEUE_TASKS_STATS_PROCESSING_INTERVAL_MS:1000}" processing_interval: "${TB_QUEUE_TASKS_STATS_PROCESSING_INTERVAL_MS:1000}"
# Event configuration parameters # Event configuration parameters
event: event:

View File

@ -52,7 +52,7 @@ import static org.mockito.Mockito.verify;
@DaoSqlTest @DaoSqlTest
@TestPropertySource(properties = { @TestPropertySource(properties = {
"queue.tasks.stats.processing_interval_ms=0" "queue.tasks.stats.processing_interval=0"
}) })
public class JobManagerTest extends AbstractControllerTest { public class JobManagerTest extends AbstractControllerTest {
@ -203,7 +203,7 @@ public class JobManagerTest extends AbstractControllerTest {
.description("test job") .description("test job")
.configuration(DummyJobConfiguration.builder() .configuration(DummyJobConfiguration.builder()
.successfulTasksCount(tasksCount) .successfulTasksCount(tasksCount)
.taskProcessingTimeMs(100) .taskProcessingTimeMs(500)
.build()) .build())
.build()).getId(); .build()).getId();

View File

@ -20,7 +20,7 @@ import org.thingsboard.server.dao.service.DaoSqlTest;
@DaoSqlTest @DaoSqlTest
@TestPropertySource(properties = { @TestPropertySource(properties = {
"queue.tasks.stats.processing_interval_ms=0", "queue.tasks.stats.processing_interval=0",
"queue.tasks.partitioning_strategy=entity", "queue.tasks.partitioning_strategy=entity",
"queue.tasks.partitions_per_type=DUMMY:100;DUMMY:50" "queue.tasks.partitions_per_type=DUMMY:100;DUMMY:50"
}) })

View File

@ -26,7 +26,7 @@ import org.thingsboard.server.dao.entity.EntityDaoService;
public interface JobService extends EntityDaoService { public interface JobService extends EntityDaoService {
Job submitJob(TenantId tenantId, Job job); Job saveJob(TenantId tenantId, Job job);
Job findJobById(TenantId tenantId, JobId jobId); Job findJobById(TenantId tenantId, JobId jobId);

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.common.data.job; package org.thingsboard.server.common.data.job;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import lombok.Builder; import lombok.Builder;
@ -43,6 +44,7 @@ public class Job extends BaseData<JobId> implements HasTenantId {
private String description; private String description;
private JobStatus status; private JobStatus status;
@NotNull @NotNull
@Valid
private JobConfiguration configuration; private JobConfiguration configuration;
private JobResult result; private JobResult result;

View File

@ -44,6 +44,8 @@ public abstract class JobResult implements Serializable {
private List<TaskResult> results = new ArrayList<>(); private List<TaskResult> results = new ArrayList<>();
private String generalError; private String generalError;
private long startTs;
private long finishTs;
private long cancellationTs; private long cancellationTs;
@JsonIgnore @JsonIgnore

View File

@ -25,8 +25,10 @@ import java.util.List;
@Data @Data
public class JobStats { public class JobStats {
private final TenantId tenantId; private final TenantId tenantId;
private final JobId jobId; private final JobId jobId;
private final List<TaskResult> taskResults = new ArrayList<>(); private final List<TaskResult> taskResults = new ArrayList<>();
private Integer totalTasksCount; private Integer totalTasksCount;
} }

View File

@ -536,10 +536,6 @@ message ToEdqsCoreServiceMsg {
bytes value = 1; bytes value = 1;
} }
message ToJobManagerMsg {
bytes value = 1;
}
message LwM2MRegistrationRequestMsg { message LwM2MRegistrationRequestMsg {
string tenantId = 1; string tenantId = 1;
string endpoint = 2; string endpoint = 2;

View File

@ -25,7 +25,11 @@ import org.thingsboard.server.queue.TbQueueMsg;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier; import java.util.function.Supplier;
@Slf4j @Slf4j
@ -39,6 +43,7 @@ public class QueueConsumerManager<M extends TbQueueMsg> {
@Getter @Getter
private final TbQueueConsumer<M> consumer; private final TbQueueConsumer<M> consumer;
private Future<?> consumerTask;
private volatile boolean stopped; private volatile boolean stopped;
@Builder @Builder
@ -63,7 +68,7 @@ public class QueueConsumerManager<M extends TbQueueMsg> {
public void launch() { public void launch() {
log.info("[{}] Launching consumer", name); log.info("[{}] Launching consumer", name);
consumerExecutor.submit(() -> { consumerTask = consumerExecutor.submit(() -> {
if (threadPrefix != null) { if (threadPrefix != null) {
ThingsBoardThreadFactory.addThreadNamePrefix(threadPrefix); ThingsBoardThreadFactory.addThreadNamePrefix(threadPrefix);
} }
@ -101,6 +106,13 @@ public class QueueConsumerManager<M extends TbQueueMsg> {
log.debug("[{}] Stopping consumer", name); log.debug("[{}] Stopping consumer", name);
stopped = true; stopped = true;
consumer.unsubscribe(); consumer.unsubscribe();
try {
if (consumerTask != null) {
consumerTask.get(10, TimeUnit.SECONDS);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("[{}] Failed to await consumer loop stop", name, e);
}
} }
public interface MsgPackProcessor<M extends TbQueueMsg> { public interface MsgPackProcessor<M extends TbQueueMsg> {

View File

@ -15,18 +15,27 @@
*/ */
package org.thingsboard.server.queue.settings; package org.thingsboard.server.queue.settings;
import lombok.Data; import lombok.Getter;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Data @Getter
@Component @Component
public class TasksQueueConfig { public class TasksQueueConfig {
@Value("${queue.tasks.poll_interval}") @Value("${queue.tasks.poll_interval:500}")
private int pollInterval; private int pollInterval;
@Value("${queue.tasks.stats.topic}") @Value("${queue.tasks.partitioning_strategy:tenant}")
private String partitioningStrategy;
@Value("${queue.tasks.stats.topic:jobs.stats}")
private String statsTopic; private String statsTopic;
@Value("${queue.tasks.stats.poll_interval:500}")
private int statsPollInterval;
@Value("${queue.tasks.stats.processing_interval:1000}")
private int statsProcessingInterval;
} }

View File

@ -53,12 +53,13 @@ public class JobStatsService {
} }
private void report(TenantId tenantId, JobId jobId, JobStatsMsg.Builder statsMsg) { private void report(TenantId tenantId, JobId jobId, JobStatsMsg.Builder statsMsg) {
log.debug("[{}] Reporting: {}", jobId, statsMsg); log.debug("[{}][{}] Reporting: {}", tenantId, jobId, statsMsg);
statsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) statsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setJobIdMSB(jobId.getId().getMostSignificantBits()) .setJobIdMSB(jobId.getId().getMostSignificantBits())
.setJobIdLSB(jobId.getId().getLeastSignificantBits()); .setJobIdLSB(jobId.getId().getLeastSignificantBits());
// using job id as msg key so that all stats for a certain job are submitted to the same partition
TbProtoQueueMsg<JobStatsMsg> msg = new TbProtoQueueMsg<>(jobId.getId(), statsMsg.build()); TbProtoQueueMsg<JobStatsMsg> msg = new TbProtoQueueMsg<>(jobId.getId(), statsMsg.build());
producer.send(TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build(), msg, TbQueueCallback.EMPTY); producer.send(TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build(), msg, TbQueueCallback.EMPTY);
} }

View File

@ -54,7 +54,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
@Transactional @Transactional
@Override @Override
public Job submitJob(TenantId tenantId, Job job) { public Job saveJob(TenantId tenantId, Job job) {
if (jobDao.existsByTenantAndKeyAndStatusOneOf(tenantId, job.getKey(), QUEUED, PENDING, RUNNING)) { if (jobDao.existsByTenantAndKeyAndStatusOneOf(tenantId, job.getKey(), QUEUED, PENDING, RUNNING)) {
throw new IllegalArgumentException("The same job is already queued or running"); throw new IllegalArgumentException("The same job is already queued or running");
} }
@ -62,6 +62,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
job.setStatus(QUEUED); job.setStatus(QUEUED);
} else { } else {
job.setStatus(PENDING); job.setStatus(PENDING);
job.getResult().setStartTs(System.currentTimeMillis());
} }
return saveJob(tenantId, job, true, null); return saveJob(tenantId, job, true, null);
} }
@ -140,6 +141,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
job.setStatus(COMPLETED); job.setStatus(COMPLETED);
publishEvent = true; publishEvent = true;
} }
result.setFinishTs(System.currentTimeMillis());
} }
} }