diff --git a/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java b/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java index f899d0f480..e5641b6748 100644 --- a/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java +++ b/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java @@ -97,7 +97,8 @@ public class CalculatedFieldController extends BaseController { public static final int TIMEOUT = 20; - private static final String TEST_SCRIPT_EXPRESSION = "Execute the Script expression and return the result. The format of request: \n\n" + private static final String TEST_SCRIPT_EXPRESSION = + "Execute the Script expression and return the result. The format of request: \n\n" + MARKDOWN_CODE_BLOCK_START + "{\n" + " \"expression\": \"var temp = 0; foreach(element: temperature.values) {temp += element.value;} var avgTemperature = temp / temperature.values.size(); var adjustedTemperature = avgTemperature + 0.1 * humidity.value; return {\\\"adjustedTemperature\\\": adjustedTemperature};\",\n" + diff --git a/application/src/main/java/org/thingsboard/server/controller/JobController.java b/application/src/main/java/org/thingsboard/server/controller/JobController.java index f7bc5255d3..d315a522ae 100644 --- a/application/src/main/java/org/thingsboard/server/controller/JobController.java +++ b/application/src/main/java/org/thingsboard/server/controller/JobController.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @@ -31,6 +32,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.task.JobService; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.job.JobManager; import java.util.UUID; @@ -47,10 +49,12 @@ import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERT public class JobController extends BaseController { private final JobService jobService; + private final JobManager jobManager; @GetMapping("/job/{id}") @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')") public Job getJobById(@PathVariable UUID id) throws ThingsboardException { + // todo check permissions return jobService.findJobById(getTenantId(), new JobId(id)); } @@ -66,8 +70,16 @@ public class JobController extends BaseController { @RequestParam(required = false) String sortProperty, @Parameter(description = SORT_ORDER_DESCRIPTION) @RequestParam(required = false) String sortOrder) throws ThingsboardException { + // todo check permissions PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); return jobService.findJobsByTenantId(getTenantId(), pageLink); } + @PostMapping("/job/{id}/cancel") + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')") + public void cancelJob(@PathVariable UUID id) throws ThingsboardException { + // todo check permissions + jobManager.cancelJob(getTenantId(), new JobId(id)); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java index 648e89adc9..68fb0bb7cf 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java @@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.notification.NotificationRequest; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; @@ -134,6 +135,9 @@ public class EntityStateSourcingListener { case CALCULATED_FIELD -> { onCalculatedFieldUpdate(event.getEntity(), event.getOldEntity()); } + case JOB -> { + onJobUpdate((Job) event.getEntity()); + } default -> { } } @@ -212,8 +216,8 @@ public class EntityStateSourcingListener { public void handleEvent(ActionEntityEvent event) { log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event); if (ActionType.CREDENTIALS_UPDATED.equals(event.getActionType()) && - EntityType.DEVICE.equals(event.getEntityId().getEntityType()) - && event.getEntity() instanceof DeviceCredentials) { + EntityType.DEVICE.equals(event.getEntityId().getEntityType()) + && event.getEntity() instanceof DeviceCredentials) { tbClusterService.pushMsgToCore(new DeviceCredentialsUpdateNotificationMsg(event.getTenantId(), (DeviceId) event.getEntityId(), (DeviceCredentials) event.getEntity()), null); } else if (ActionType.ASSIGNED_TO_TENANT.equals(event.getActionType()) && event.getEntity() instanceof Device device) { @@ -295,6 +299,12 @@ public class EntityStateSourcingListener { tbClusterService.onCalculatedFieldUpdated(calculatedField, oldCalculatedField, TbQueueCallback.EMPTY); } + private void onJobUpdate(Job job) { + if (job.getResult().getCancellationTs() > 0) { + tbClusterService.broadcastEntityStateChangeEvent(job.getTenantId(), job.getId(), ComponentLifecycleEvent.STOPPED); + } + } + private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) { String data = JacksonUtil.toString(JacksonUtil.valueToTree(assignedDevice)); if (data != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java index 64c1615310..727beaf859 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java @@ -23,6 +23,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.JobId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobStats; import org.thingsboard.server.common.data.job.JobType; @@ -62,7 +63,7 @@ public class DefaultJobManager implements JobManager { private final JobStatsService jobStatsService; private final Map jobProcessors; private final Map>> taskProducers; - private final QueueConsumerManager> taskResultConsumer; + private final QueueConsumerManager> jobStatsConsumer; private final ExecutorService consumerExecutor; @Value("${queue.tasks.stats.processing_interval_ms:5000}") @@ -73,8 +74,8 @@ public class DefaultJobManager implements JobManager { this.jobStatsService = jobStatsService; this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity())); this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer)); - this.consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("task-result-consumer")); - this.taskResultConsumer = QueueConsumerManager.>builder() + this.consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("job-stats-consumer")); + this.jobStatsConsumer = QueueConsumerManager.>builder() .name("job-stats") .msgPackProcessor(this::processStats) .pollInterval(125) @@ -85,8 +86,8 @@ public class DefaultJobManager implements JobManager { @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) public void afterStartUp() { - taskResultConsumer.subscribe(); - taskResultConsumer.launch(); + jobStatsConsumer.subscribe(); + jobStatsConsumer.launch(); } @Override @@ -95,10 +96,16 @@ public class DefaultJobManager implements JobManager { log.info("Submitting job: {}", job); int tasksCount = jobProcessors.get(job.getType()).process(job, this::submitTask); - jobStatsService.reportAllTasksSubmitted(job.getId(), tasksCount); + jobStatsService.reportAllTasksSubmitted(job.getTenantId(), job.getId(), tasksCount); return job; } + @Override + public void cancelJob(TenantId tenantId, JobId jobId) { + log.info("Cancelling job: {}", jobId); + jobService.cancelJob(tenantId, jobId); + } + private void submitTask(Task task) { log.info("Submitting task: {}", task); TaskProto taskProto = TaskProto.newBuilder() @@ -126,8 +133,9 @@ public class DefaultJobManager implements JobManager { for (TbProtoQueueMsg msg : msgs) { JobStatsMsg statsMsg = msg.getValue(); + TenantId tenantId = TenantId.fromUUID(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); JobId jobId = new JobId(new UUID(statsMsg.getJobIdMSB(), statsMsg.getJobIdLSB())); - JobStats jobStats = stats.computeIfAbsent(jobId, JobStats::new); + JobStats jobStats = stats.computeIfAbsent(jobId, __ -> new JobStats(tenantId, jobId)); if (statsMsg.hasTaskResult()) { TaskResult taskResult = JacksonUtil.fromString(statsMsg.getTaskResult().getValue(), TaskResult.class); @@ -140,8 +148,9 @@ public class DefaultJobManager implements JobManager { stats.forEach((jobId, jobStats) -> { try { - log.info("[{}] Processing job stats: {}", jobId, stats); - jobService.processStats(jobId, jobStats); + TenantId tenantId = jobStats.getTenantId(); + log.info("[{}][{}] Processing job stats: {}", tenantId, jobId, stats); + jobService.processStats(tenantId, jobId, jobStats); } catch (Exception e) { log.warn("Failed to process job stats for {}: {}", jobId, jobStats, e); } @@ -153,7 +162,7 @@ public class DefaultJobManager implements JobManager { @PreDestroy private void destroy() { - taskResultConsumer.stop(); + jobStatsConsumer.stop(); consumerExecutor.shutdownNow(); } diff --git a/application/src/main/java/org/thingsboard/server/service/job/JobManager.java b/application/src/main/java/org/thingsboard/server/service/job/JobManager.java index c78de2a5d9..71ff3dcaa2 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/JobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/JobManager.java @@ -15,10 +15,14 @@ */ package org.thingsboard.server.service.job; +import org.thingsboard.server.common.data.id.JobId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; public interface JobManager { Job submitJob(Job job); + void cancelJob(TenantId tenantId, JobId jobId); + } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 561dc7122a..cdc7abb4d7 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -594,6 +594,7 @@ public class DefaultTbClusterService implements TbClusterService { || entityType.equals(EntityType.ENTITY_VIEW) || entityType.equals(EntityType.NOTIFICATION_RULE) || entityType.equals(EntityType.CALCULATED_FIELD) + || entityType.equals(EntityType.JOB) ) { TbQueueProducer> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer(); Set tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE); diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java index f9ee2e2b13..68d2d96471 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java @@ -20,22 +20,28 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.TestPropertySource; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.job.DummyJobConfiguration; import org.thingsboard.server.common.data.job.Job; +import org.thingsboard.server.common.data.job.JobResult; import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.controller.AbstractControllerTest; import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.service.job.task.DummyTaskProcessor; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; @DaoSqlTest @TestPropertySource(properties = { @@ -46,6 +52,9 @@ public class JobManagerTest extends AbstractControllerTest { @Autowired private JobManager jobManager; + @SpyBean + private DummyTaskProcessor taskProcessor; + @Before public void setUp() throws Exception { loginTenantAdmin(); @@ -80,6 +89,7 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED); assertThat(job.getResult().getSuccessfulCount()).isEqualTo(tasksCount); assertThat(job.getResult().getFailures()).isEmpty(); + assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount); }); } @@ -104,15 +114,76 @@ public class JobManagerTest extends AbstractControllerTest { await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { Job job = findJobById(jobId); assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); - assertThat(job.getResult().getSuccessfulCount()).isEqualTo(successfulTasks); - assertThat(job.getResult().getFailedCount()).isEqualTo(failedTasks); - assertThat(job.getResult().getTotalCount()).isEqualTo(successfulTasks + failedTasks); - assertThat(job.getResult().getFailures().get("Task 1")).isEqualTo("error3"); // last error - assertThat(job.getResult().getFailures().get("Task 2")).isEqualTo("error3"); // last error + JobResult jobResult = job.getResult(); + assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks); + assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks); + assertThat(jobResult.getTotalCount()).isEqualTo(successfulTasks + failedTasks); + assertThat(jobResult.getFailures().get("Task 1")).isEqualTo("error3"); // last error + assertThat(jobResult.getFailures().get("Task 2")).isEqualTo("error3"); // last error + assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount()); }); } + @Test + public void testCancelJob_whileRunning() throws Exception { + int tasksCount = 100; + JobId jobId = jobManager.submitJob(Job.builder() + .tenantId(tenantId) + .type(JobType.DUMMY) + .key("test-job") + .description("test job") + .configuration(DummyJobConfiguration.builder() + .successfulTasksCount(tasksCount) + .taskProcessingTimeMs(100) + .build()) + .build()).getId(); + Thread.sleep(500); + jobManager.cancelJob(tenantId, jobId); + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + Job job = findJobById(jobId); + assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED); + assertThat(job.getResult().getSuccessfulCount()).isBetween(1, tasksCount - 1); + assertThat(job.getResult().getCancelledCount()).isBetween(1, tasksCount - 1); + assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount); + assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount); + }); + } + + @Test + public void testCancelJob_simulateTaskProcessorRestart() { + int tasksCount = 10; + JobId jobId = jobManager.submitJob(Job.builder() + .tenantId(tenantId) + .type(JobType.DUMMY) + .key("test-job") + .description("test job") + .configuration(DummyJobConfiguration.builder() + .successfulTasksCount(tasksCount) + .taskProcessingTimeMs(100) + .build()) + .build()).getId(); + + // simulate cancelled jobs are forgotten + AtomicInteger cancellationRenotifyAttempt = new AtomicInteger(0); + doAnswer(inv -> { + if (cancellationRenotifyAttempt.incrementAndGet() >= 5) { + inv.callRealMethod(); + } + return null; + }).when(taskProcessor).addToCancelledJobs(any()); // ignoring cancellation event, + jobManager.cancelJob(tenantId, jobId); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + Job job = findJobById(jobId); + System.err.println(job); + assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED); + assertThat(job.getResult().getSuccessfulCount()).isBetween(1, tasksCount - 1); + assertThat(job.getResult().getCancelledCount()).isBetween(1, tasksCount - 1); + assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount); + assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount); + }); + } private Job findJobById(JobId jobId) throws Exception { return doGet("/api/job/" + jobId, Job.class); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/task/JobService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/task/JobService.java index a28b0e391a..9ce802b84f 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/task/JobService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/task/JobService.java @@ -29,7 +29,9 @@ public interface JobService extends EntityDaoService { Job findJobById(TenantId tenantId, JobId jobId); - void processStats(JobId jobId, JobStats jobStats); + void cancelJob(TenantId tenantId, JobId jobId); + + void processStats(TenantId tenantId, JobId jobId, JobStats jobStats); PageData findJobsByTenantId(TenantId tenantId, PageLink pageLink); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java index eccdfae6bb..b541458289 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java @@ -20,13 +20,15 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import java.io.Serializable; + @JsonIgnoreProperties(ignoreUnknown = true) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ @Type(name = "CF_REPROCESSING", value = CfReprocessingJobConfiguration.class), @Type(name = "DUMMY", value = DummyJobConfiguration.class), }) -public interface JobConfiguration { +public interface JobConfiguration extends Serializable { JobType getType(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java index 07b6c4eadd..abaa86facb 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.data.job; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; @@ -22,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import lombok.Data; import lombok.NoArgsConstructor; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; @@ -33,13 +35,21 @@ import java.util.Map; }) @Data @NoArgsConstructor -public abstract class JobResult { +public abstract class JobResult implements Serializable { private int successfulCount; private int failedCount; + private int cancelledCount; private Integer totalCount = null; // set when all tasks are submitted private Map failures = new HashMap<>(); + private long cancellationTs; + + @JsonIgnore + public int getCompletedCount() { + return successfulCount + failedCount + cancelledCount; + } + public abstract JobType getJobType(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStats.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStats.java index 6491d0998a..9d2c3d9be2 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStats.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStats.java @@ -17,12 +17,14 @@ package org.thingsboard.server.common.data.job; import lombok.Data; import org.thingsboard.server.common.data.id.JobId; +import org.thingsboard.server.common.data.id.TenantId; import java.util.ArrayList; import java.util.List; @Data public class JobStats { + private final TenantId tenantId; private final JobId jobId; private final List taskResults = new ArrayList<>(); private Integer totalTasksCount; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java index ae5e6bbba9..57f2f44d7a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java @@ -27,6 +27,7 @@ import lombok.NoArgsConstructor; public class TaskResult { private boolean success; + private boolean cancelled; private TaskFailure failure; @Data diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 032301206f..b4d436a32a 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -61,6 +61,7 @@ enum EntityTypeProto { MOBILE_APP_BUNDLE = 38; CALCULATED_FIELD = 39; CALCULATED_FIELD_LINK = 40; + JOB = 41; } enum ApiUsageRecordKeyProto { @@ -534,6 +535,10 @@ message ToEdqsCoreServiceMsg { bytes value = 1; } +message ToJobManagerMsg { + bytes value = 1; +} + message LwM2MRegistrationRequestMsg { string tenantId = 1; string endpoint = 2; @@ -1852,10 +1857,12 @@ message TaskProto { } message JobStatsMsg { - int64 jobIdMSB = 1; - int64 jobIdLSB = 2; - optional TaskResultProto taskResult = 3; - optional int32 totalTasksCount = 4; + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 jobIdMSB = 3; + int64 jobIdLSB = 4; + optional TaskResultProto taskResult = 5; + optional int32 totalTasksCount = 6; } message TaskResultProto { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java index 6c36c573ca..ceba2645f9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java @@ -21,6 +21,7 @@ import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.id.JobId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.TaskResult; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; @@ -38,21 +39,23 @@ public class JobStatsService { private final TbQueueProducerProvider producerProvider; - public void reportTaskResult(JobId jobId, TaskResult result) { - report(jobId, JobStatsMsg.newBuilder() + public void reportTaskResult(TenantId tenantId, JobId jobId, TaskResult result) { + report(tenantId, jobId, JobStatsMsg.newBuilder() .setTaskResult(TaskResultProto.newBuilder() .setValue(JacksonUtil.toString(result)) .build())); } - public void reportAllTasksSubmitted(JobId jobId, int tasksCount) { - report(jobId, JobStatsMsg.newBuilder() + public void reportAllTasksSubmitted(TenantId tenantId, JobId jobId, int tasksCount) { + report(tenantId, jobId, JobStatsMsg.newBuilder() .setTotalTasksCount(tasksCount)); } - private void report(JobId jobId, JobStatsMsg.Builder statsMsg) { + private void report(TenantId tenantId, JobId jobId, JobStatsMsg.Builder statsMsg) { log.info("[{}] Reporting: {}", jobId, statsMsg); - statsMsg.setJobIdMSB(jobId.getId().getMostSignificantBits()) + statsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setJobIdMSB(jobId.getId().getMostSignificantBits()) .setJobIdLSB(jobId.getId().getLeastSignificantBits()); TbProtoQueueMsg msg = new TbProtoQueueMsg<>(jobId.getId(), statsMsg.build()); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index 52fd382c3d..43d2dd74c0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -17,14 +17,20 @@ package org.thingsboard.server.queue.task; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.Task; import org.thingsboard.server.common.data.job.TaskResult; import org.thingsboard.server.common.data.job.TaskResult.TaskFailure; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -33,12 +39,16 @@ import org.thingsboard.server.queue.provider.TaskProcessorQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -@Slf4j public abstract class TaskProcessor { + protected final Logger log = LoggerFactory.getLogger(getClass()); + @Autowired private TaskProcessorQueueFactory queueFactory; @Autowired @@ -47,12 +57,14 @@ public abstract class TaskProcessor { private QueueConsumerManager> taskConsumer; private ExecutorService consumerExecutor; + private final Set cancelledJobs = ConcurrentHashMap.newKeySet(); // fixme use caffeine + @PostConstruct public void init() { consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getJobType().name().toLowerCase() + "-task-consumer")); taskConsumer = QueueConsumerManager.>builder() // fixme: should be consumer per partition .name(getJobType().name().toLowerCase() + "-tasks") - .msgPackProcessor(this::processMsgs) + .msgPackProcessor(this::processMsgs) // todo: max.poll.records = 1 .pollInterval(125) .consumerCreator(() -> queueFactory.createTaskConsumer(getJobType())) .consumerExecutor(consumerExecutor) @@ -65,16 +77,27 @@ public abstract class TaskProcessor { taskConsumer.launch(); } - @PreDestroy - public void destroy() { - taskConsumer.stop(); - consumerExecutor.shutdownNow(); + @EventListener + public void onJobCancelled(ComponentLifecycleMsg event) { + if (event.getEntityId().getEntityType() != EntityType.JOB) { + return; + } + JobId jobId = (JobId) event.getEntityId(); + if (event.getEvent() == ComponentLifecycleEvent.STOPPED) { + log.info("Adding job {} to cancelled", jobId); + addToCancelledJobs(jobId); + } } private void processMsgs(List> msgs, TbQueueConsumer> consumer) { for (TbProtoQueueMsg msg : msgs) { TaskProto taskProto = msg.getValue(); Task task = JacksonUtil.fromString(taskProto.getValue(), Task.class); + if (cancelledJobs.contains(task.getJobId().getId())) { + log.info("Skipping task '{}' for cancelled job {}", task.getKey(), task.getJobId()); + reportCancelled(task); + continue; + } processTask((T) task); } consumer.commit(); @@ -96,11 +119,13 @@ public abstract class TaskProcessor { } } + protected abstract void process(T task) throws Exception; + private void reportSuccess(Task task) { TaskResult result = TaskResult.builder() .success(true) .build(); - statsService.reportTaskResult(task.getJobId(), result); + statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result); } private void reportFailure(Task task, Throwable error) { @@ -110,10 +135,26 @@ public abstract class TaskProcessor { .task(task) .build()) .build(); - statsService.reportTaskResult(task.getJobId(), result); + statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result); + } + + private void reportCancelled(Task task) { + TaskResult result = TaskResult.builder() + .cancelled(true) + .build(); + statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result); + } + + public void addToCancelledJobs(JobId jobId) { + cancelledJobs.add(jobId.getId()); + } + + @PreDestroy + public void destroy() { + taskConsumer.stop(); + consumerExecutor.shutdownNow(); } - protected abstract void process(T task) throws Exception; public abstract JobType getJobType(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/task/JobRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/task/JobRepository.java index 9fff4d06b7..472df05bdd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/task/JobRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/task/JobRepository.java @@ -15,10 +15,12 @@ */ package org.thingsboard.server.dao.sql.task; +import jakarta.persistence.LockModeType; import jakarta.transaction.Transactional; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Lock; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; @@ -40,6 +42,10 @@ public interface JobRepository extends JpaRepository { @Param("searchText") String searchText, Pageable pageable); + @Lock(LockModeType.PESSIMISTIC_WRITE) // SELECT FOR UPDATE + @Query("SELECT j FROM JobEntity j WHERE j.id = :id") + JobEntity findByIdForUpdate(UUID id); + @Modifying @Transactional @Query(value = """ diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/task/JpaJobDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/task/JpaJobDao.java index b8b36dbc23..92b9fc8a72 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/task/JpaJobDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/task/JpaJobDao.java @@ -49,13 +49,8 @@ public class JpaJobDao extends JpaAbstractDao implements JobDao } @Override - public boolean reportTaskSuccess(JobId jobId, int tasksCount) { - return jobRepository.reportTaskSuccess(jobId.getId(), tasksCount); - } - - @Override - public boolean reportTaskFailure(JobId jobId, String taskKey, String error) { - return jobRepository.reportTaskFailure(jobId.getId(), taskKey, error); + public Job findByIdForUpdate(TenantId tenantId, JobId jobId) { + return DaoUtil.getData(jobRepository.findByIdForUpdate(jobId.getId())); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/task/DefaultJobService.java b/dao/src/main/java/org/thingsboard/server/dao/task/DefaultJobService.java index dba569daa8..1e694508fa 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/task/DefaultJobService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/task/DefaultJobService.java @@ -21,6 +21,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.HasId; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; @@ -31,6 +32,8 @@ import org.thingsboard.server.common.data.job.TaskResult; import org.thingsboard.server.common.data.job.TaskResult.TaskFailure; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.entity.AbstractEntityService; +import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.service.DataValidator; @@ -39,7 +42,7 @@ import java.util.Optional; @Service @RequiredArgsConstructor @Slf4j -public class DefaultJobService implements JobService { +public class DefaultJobService extends AbstractEntityService implements JobService { private final JobDao jobDao; private final JobValidator validator = new JobValidator(); @@ -47,7 +50,7 @@ public class DefaultJobService implements JobService { @Override public Job createJob(TenantId tenantId, Job job) { validator.validate(job, Job::getTenantId); - return jobDao.save(tenantId, job); + return saveJob(tenantId, job, false); } @Override @@ -55,9 +58,21 @@ public class DefaultJobService implements JobService { return jobDao.findById(tenantId, jobId.getId()); } + @Transactional @Override - public void processStats(JobId jobId, JobStats jobStats) { - Job job = jobDao.findById(TenantId.SYS_TENANT_ID, jobId.getId()); + public void cancelJob(TenantId tenantId, JobId jobId) { + Job job = findForUpdate(tenantId, jobId); + if (job.getStatus() != JobStatus.PENDING && job.getStatus() != JobStatus.RUNNING) { + throw new IllegalArgumentException("Job already " + job.getStatus().name().toLowerCase()); + } + job.getResult().setCancellationTs(System.currentTimeMillis()); + saveJob(tenantId, job, true); + } + + @Transactional + @Override + public void processStats(TenantId tenantId, JobId jobId, JobStats jobStats) { + Job job = findForUpdate(tenantId, jobId); switch (job.getStatus()) { case PENDING -> { job.setStatus(JobStatus.RUNNING); @@ -73,26 +88,52 @@ public class DefaultJobService implements JobService { jobResult.setTotalCount(jobStats.getTotalTasksCount()); } + boolean publishEvent = false; for (TaskResult taskResult : jobStats.getTaskResults()) { if (taskResult.isSuccess()) { jobResult.setSuccessfulCount(jobResult.getSuccessfulCount() + 1); + } else if (taskResult.isCancelled()) { + jobResult.setCancelledCount(jobResult.getCancelledCount() + 1); } else { TaskFailure failure = taskResult.getFailure(); String key = failure.getTask().getKey(); jobResult.setFailedCount(jobResult.getFailedCount() + 1); jobResult.getFailures().put(key, failure.getError()); } + + if (jobResult.getCancellationTs() > 0) { + if (!taskResult.isCancelled() && System.currentTimeMillis() > jobResult.getCancellationTs()) { + log.info("Got task result for cancelled job {}: {}, re-notifying processors about cancellation", jobId, taskResult); + // task processor forgot the task is cancelled + publishEvent = true; + } + } } - if (jobResult.getTotalCount() != null && jobResult.getSuccessfulCount() + jobResult.getFailedCount() >= jobResult.getTotalCount()) { - if (jobResult.getFailures().isEmpty()) { - job.setStatus(JobStatus.COMPLETED); - } else { + if (jobResult.getTotalCount() != null && jobResult.getCompletedCount() >= jobResult.getTotalCount()) { + if (jobResult.getCancellationTs() > 0) { + job.setStatus(JobStatus.CANCELLED); + } else if (jobResult.getFailedCount() > 0) { job.setStatus(JobStatus.FAILED); + } else { + job.setStatus(JobStatus.COMPLETED); } } log.info("Saving job {}", job); - jobDao.save(TenantId.SYS_TENANT_ID, job); + saveJob(tenantId, job, publishEvent); + } + + private Job saveJob(TenantId tenantId, Job job, boolean publishEvent) { + job = jobDao.save(tenantId, job); + if (publishEvent) { + eventPublisher.publishEvent(SaveEntityEvent.builder() + .tenantId(tenantId) + .entityId(job.getId()) + .entity(job) + .created(false) + .build()); + } + return job; } @Override @@ -100,6 +141,10 @@ public class DefaultJobService implements JobService { return jobDao.findByTenantId(tenantId, pageLink); } + private Job findForUpdate(TenantId tenantId, JobId jobId) { + return jobDao.findByIdForUpdate(tenantId, jobId); + } + // todo: cancellation, reprocessing public class JobValidator extends DataValidator { diff --git a/dao/src/main/java/org/thingsboard/server/dao/task/JobDao.java b/dao/src/main/java/org/thingsboard/server/dao/task/JobDao.java index 5c3c5b977c..3b1d6631dd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/task/JobDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/task/JobDao.java @@ -28,9 +28,7 @@ public interface JobDao extends Dao { PageData findByTenantId(TenantId tenantId, PageLink pageLink); - boolean reportTaskSuccess(JobId jobId, int tasksCount); - - boolean reportTaskFailure(JobId jobId, String taskKey, String error); + Job findByIdForUpdate(TenantId tenantId, JobId jobId); boolean existsByKeyAndStatusOneOf(String key, JobStatus... statuses);