From 4fbb6c2e71a080f989900eebf894400caf22fb3e Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 13 May 2025 14:48:46 +0300 Subject: [PATCH] Jobs: refactoring --- .../server/service/job/DefaultJobManager.java | 22 +++++++++---------- .../src/main/resources/thingsboard.yml | 5 ++++- .../server/service/job/JobManagerTest.java | 4 ++-- ...anagerTest_EntityPartitioningStrategy.java | 2 +- .../server/dao/job/JobService.java | 2 +- .../server/common/data/job/Job.java | 2 ++ .../server/common/data/job/JobResult.java | 2 ++ .../server/common/data/job/JobStats.java | 2 ++ common/proto/src/main/proto/queue.proto | 4 ---- .../common/consumer/QueueConsumerManager.java | 14 +++++++++++- .../queue/settings/TasksQueueConfig.java | 17 ++++++++++---- .../server/queue/task/JobStatsService.java | 3 ++- .../server/dao/job/DefaultJobService.java | 4 +++- 13 files changed, 55 insertions(+), 28 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java index 8c32de4f45..580dd0ff99 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java @@ -18,7 +18,6 @@ package org.thingsboard.server.service.job; import jakarta.annotation.PreDestroy; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; @@ -51,6 +50,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; +import org.thingsboard.server.queue.settings.TasksQueueConfig; import org.thingsboard.server.queue.task.JobStatsService; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -74,23 +74,21 @@ public class DefaultJobManager implements JobManager { private final JobStatsService jobStatsService; private final NotificationCenter notificationCenter; private final PartitionService partitionService; + private final TasksQueueConfig queueConfig; private final Map jobProcessors; private final Map>> taskProducers; private final QueueConsumerManager> jobStatsConsumer; private final ExecutorService executor; private final ExecutorService consumerExecutor; - @Value("${queue.tasks.partitioning_strategy:tenant}") - private String tasksPartitioningStrategy; - @Value("${queue.tasks.stats.processing_interval_ms:1000}") - private int statsProcessingInterval; - public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, NotificationCenter notificationCenter, - PartitionService partitionService, TbCoreQueueFactory queueFactory, List jobProcessors) { + PartitionService partitionService, TbCoreQueueFactory queueFactory, TasksQueueConfig queueConfig, + List jobProcessors) { this.jobService = jobService; this.jobStatsService = jobStatsService; this.notificationCenter = notificationCenter; this.partitionService = partitionService; + this.queueConfig = queueConfig; this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity())); this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer)); this.executor = ThingsBoardExecutors.newWorkStealingPool(Math.max(4, Runtime.getRuntime().availableProcessors()), getClass()); @@ -98,7 +96,7 @@ public class DefaultJobManager implements JobManager { this.jobStatsConsumer = QueueConsumerManager.>builder() .name("job-stats") .msgPackProcessor(this::processStats) - .pollInterval(125) + .pollInterval(queueConfig.getStatsPollInterval()) .consumerCreator(queueFactory::createJobStatsConsumer) .consumerExecutor(consumerExecutor) .build(); @@ -113,7 +111,7 @@ public class DefaultJobManager implements JobManager { @Override public Job submitJob(Job job) { log.debug("Submitting job: {}", job); - return jobService.submitJob(job.getTenantId(), job); + return jobService.saveJob(job.getTenantId(), job); } @Override @@ -196,7 +194,7 @@ public class DefaultJobManager implements JobManager { job.getConfiguration().setToReprocess(taskFailures); - jobService.submitJob(tenantId, job); + jobService.saveJob(tenantId, job); } private void submitTask(Task task) { @@ -207,7 +205,7 @@ public class DefaultJobManager implements JobManager { TbQueueProducer> producer = taskProducers.get(task.getJobType()); EntityId entityId = null; - if (tasksPartitioningStrategy.equals("entity")) { + if (queueConfig.getPartitioningStrategy().equals("entity")) { entityId = task.getEntityId(); } if (entityId == null) { @@ -257,7 +255,7 @@ public class DefaultJobManager implements JobManager { }); consumer.commit(); - Thread.sleep(statsProcessingInterval); + Thread.sleep(queueConfig.getStatsProcessingInterval()); } private void sendJobFinishedNotification(Job job) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 4211b4e845..de641e39b2 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1908,9 +1908,12 @@ queue: # In a single-tenant environment, use 'entity' strategy to distribute the tasks among multiple partitions. partitioning_strategy: "${TB_QUEUE_TASKS_PARTITIONING_STRATEGY:tenant}" stats: + # Name for the tasks stats topic topic: "${TB_QUEUE_TASKS_STATS_TOPIC:jobs.stats}" + # Poll interval in milliseconds for tasks stats topic + poll_interval: "${TB_QUEUE_TASKS_STATS_POLL_INTERVAL_MS:500}" # Interval in milliseconds to process job stats - processing_interval_ms: "${TB_QUEUE_TASKS_STATS_PROCESSING_INTERVAL_MS:1000}" + processing_interval: "${TB_QUEUE_TASKS_STATS_PROCESSING_INTERVAL_MS:1000}" # Event configuration parameters event: diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java index 206acc450d..85b963a196 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java @@ -52,7 +52,7 @@ import static org.mockito.Mockito.verify; @DaoSqlTest @TestPropertySource(properties = { - "queue.tasks.stats.processing_interval_ms=0" + "queue.tasks.stats.processing_interval=0" }) public class JobManagerTest extends AbstractControllerTest { @@ -203,7 +203,7 @@ public class JobManagerTest extends AbstractControllerTest { .description("test job") .configuration(DummyJobConfiguration.builder() .successfulTasksCount(tasksCount) - .taskProcessingTimeMs(100) + .taskProcessingTimeMs(500) .build()) .build()).getId(); diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest_EntityPartitioningStrategy.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest_EntityPartitioningStrategy.java index 983f30d523..a021603ca6 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest_EntityPartitioningStrategy.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest_EntityPartitioningStrategy.java @@ -20,7 +20,7 @@ import org.thingsboard.server.dao.service.DaoSqlTest; @DaoSqlTest @TestPropertySource(properties = { - "queue.tasks.stats.processing_interval_ms=0", + "queue.tasks.stats.processing_interval=0", "queue.tasks.partitioning_strategy=entity", "queue.tasks.partitions_per_type=DUMMY:100;DUMMY:50" }) diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java index 3204044880..33f9511267 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java @@ -26,7 +26,7 @@ import org.thingsboard.server.dao.entity.EntityDaoService; public interface JobService extends EntityDaoService { - Job submitJob(TenantId tenantId, Job job); + Job saveJob(TenantId tenantId, Job job); Job findJobById(TenantId tenantId, JobId jobId); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java index 237c223a92..d4e69761c8 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.data.job; +import jakarta.validation.Valid; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; import lombok.Builder; @@ -43,6 +44,7 @@ public class Job extends BaseData implements HasTenantId { private String description; private JobStatus status; @NotNull + @Valid private JobConfiguration configuration; private JobResult result; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java index 3af076cd19..285143dfa4 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java @@ -44,6 +44,8 @@ public abstract class JobResult implements Serializable { private List results = new ArrayList<>(); private String generalError; + private long startTs; + private long finishTs; private long cancellationTs; @JsonIgnore diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStats.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStats.java index dc3e265f2d..50a3b1d759 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStats.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStats.java @@ -25,8 +25,10 @@ import java.util.List; @Data public class JobStats { + private final TenantId tenantId; private final JobId jobId; private final List taskResults = new ArrayList<>(); private Integer totalTasksCount; + } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 04c41a7f69..2a7d28241e 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -536,10 +536,6 @@ message ToEdqsCoreServiceMsg { bytes value = 1; } -message ToJobManagerMsg { - bytes value = 1; -} - message LwM2MRegistrationRequestMsg { string tenantId = 1; string endpoint = 2; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueConsumerManager.java index ffed499d8d..5025d887cb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueConsumerManager.java @@ -25,7 +25,11 @@ import org.thingsboard.server.queue.TbQueueMsg; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Supplier; @Slf4j @@ -39,6 +43,7 @@ public class QueueConsumerManager { @Getter private final TbQueueConsumer consumer; + private Future consumerTask; private volatile boolean stopped; @Builder @@ -63,7 +68,7 @@ public class QueueConsumerManager { public void launch() { log.info("[{}] Launching consumer", name); - consumerExecutor.submit(() -> { + consumerTask = consumerExecutor.submit(() -> { if (threadPrefix != null) { ThingsBoardThreadFactory.addThreadNamePrefix(threadPrefix); } @@ -101,6 +106,13 @@ public class QueueConsumerManager { log.debug("[{}] Stopping consumer", name); stopped = true; consumer.unsubscribe(); + try { + if (consumerTask != null) { + consumerTask.get(10, TimeUnit.SECONDS); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("[{}] Failed to await consumer loop stop", name, e); + } } public interface MsgPackProcessor { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TasksQueueConfig.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TasksQueueConfig.java index f4916a411e..7c94596139 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TasksQueueConfig.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TasksQueueConfig.java @@ -15,18 +15,27 @@ */ package org.thingsboard.server.queue.settings; -import lombok.Data; +import lombok.Getter; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -@Data +@Getter @Component public class TasksQueueConfig { - @Value("${queue.tasks.poll_interval}") + @Value("${queue.tasks.poll_interval:500}") private int pollInterval; - @Value("${queue.tasks.stats.topic}") + @Value("${queue.tasks.partitioning_strategy:tenant}") + private String partitioningStrategy; + + @Value("${queue.tasks.stats.topic:jobs.stats}") private String statsTopic; + @Value("${queue.tasks.stats.poll_interval:500}") + private int statsPollInterval; + + @Value("${queue.tasks.stats.processing_interval:1000}") + private int statsProcessingInterval; + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java index 28d08f593c..0b7d18fde4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java @@ -53,12 +53,13 @@ public class JobStatsService { } private void report(TenantId tenantId, JobId jobId, JobStatsMsg.Builder statsMsg) { - log.debug("[{}] Reporting: {}", jobId, statsMsg); + log.debug("[{}][{}] Reporting: {}", tenantId, jobId, statsMsg); statsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setJobIdMSB(jobId.getId().getMostSignificantBits()) .setJobIdLSB(jobId.getId().getLeastSignificantBits()); + // using job id as msg key so that all stats for a certain job are submitted to the same partition TbProtoQueueMsg msg = new TbProtoQueueMsg<>(jobId.getId(), statsMsg.build()); producer.send(TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build(), msg, TbQueueCallback.EMPTY); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java index ba01b10326..50b3e4bde8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java @@ -54,7 +54,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi @Transactional @Override - public Job submitJob(TenantId tenantId, Job job) { + public Job saveJob(TenantId tenantId, Job job) { if (jobDao.existsByTenantAndKeyAndStatusOneOf(tenantId, job.getKey(), QUEUED, PENDING, RUNNING)) { throw new IllegalArgumentException("The same job is already queued or running"); } @@ -62,6 +62,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi job.setStatus(QUEUED); } else { job.setStatus(PENDING); + job.getResult().setStartTs(System.currentTimeMillis()); } return saveJob(tenantId, job, true, null); } @@ -140,6 +141,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi job.setStatus(COMPLETED); publishEvent = true; } + result.setFinishTs(System.currentTimeMillis()); } }