Implement job cancellation

This commit is contained in:
ViacheslavKlimov 2025-04-25 12:21:20 +03:00
parent 64f35d2d7b
commit ee9237c416
19 changed files with 280 additions and 60 deletions

View File

@ -97,7 +97,8 @@ public class CalculatedFieldController extends BaseController {
public static final int TIMEOUT = 20;
private static final String TEST_SCRIPT_EXPRESSION = "Execute the Script expression and return the result. The format of request: \n\n"
private static final String TEST_SCRIPT_EXPRESSION =
"Execute the Script expression and return the result. The format of request: \n\n"
+ MARKDOWN_CODE_BLOCK_START
+ "{\n" +
" \"expression\": \"var temp = 0; foreach(element: temperature.values) {temp += element.value;} var avgTemperature = temp / temperature.values.size(); var adjustedTemperature = avgTemperature + 0.1 * humidity.value; return {\\\"adjustedTemperature\\\": adjustedTemperature};\",\n" +

View File

@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@ -31,6 +32,7 @@ import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.task.JobService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.job.JobManager;
import java.util.UUID;
@ -47,10 +49,12 @@ import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERT
public class JobController extends BaseController {
private final JobService jobService;
private final JobManager jobManager;
@GetMapping("/job/{id}")
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')")
public Job getJobById(@PathVariable UUID id) throws ThingsboardException {
// todo check permissions
return jobService.findJobById(getTenantId(), new JobId(id));
}
@ -66,8 +70,16 @@ public class JobController extends BaseController {
@RequestParam(required = false) String sortProperty,
@Parameter(description = SORT_ORDER_DESCRIPTION)
@RequestParam(required = false) String sortOrder) throws ThingsboardException {
// todo check permissions
PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder);
return jobService.findJobsByTenantId(getTenantId(), pageLink);
}
@PostMapping("/job/{id}/cancel")
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')")
public void cancelJob(@PathVariable UUID id) throws ThingsboardException {
// todo check permissions
jobManager.cancelJob(getTenantId(), new JobId(id));
}
}

View File

@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.notification.NotificationRequest;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
@ -134,6 +135,9 @@ public class EntityStateSourcingListener {
case CALCULATED_FIELD -> {
onCalculatedFieldUpdate(event.getEntity(), event.getOldEntity());
}
case JOB -> {
onJobUpdate((Job) event.getEntity());
}
default -> {
}
}
@ -212,8 +216,8 @@ public class EntityStateSourcingListener {
public void handleEvent(ActionEntityEvent<?> event) {
log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event);
if (ActionType.CREDENTIALS_UPDATED.equals(event.getActionType()) &&
EntityType.DEVICE.equals(event.getEntityId().getEntityType())
&& event.getEntity() instanceof DeviceCredentials) {
EntityType.DEVICE.equals(event.getEntityId().getEntityType())
&& event.getEntity() instanceof DeviceCredentials) {
tbClusterService.pushMsgToCore(new DeviceCredentialsUpdateNotificationMsg(event.getTenantId(),
(DeviceId) event.getEntityId(), (DeviceCredentials) event.getEntity()), null);
} else if (ActionType.ASSIGNED_TO_TENANT.equals(event.getActionType()) && event.getEntity() instanceof Device device) {
@ -295,6 +299,12 @@ public class EntityStateSourcingListener {
tbClusterService.onCalculatedFieldUpdated(calculatedField, oldCalculatedField, TbQueueCallback.EMPTY);
}
private void onJobUpdate(Job job) {
if (job.getResult().getCancellationTs() > 0) {
tbClusterService.broadcastEntityStateChangeEvent(job.getTenantId(), job.getId(), ComponentLifecycleEvent.STOPPED);
}
}
private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) {
String data = JacksonUtil.toString(JacksonUtil.valueToTree(assignedDevice));
if (data != null) {

View File

@ -23,6 +23,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobStats;
import org.thingsboard.server.common.data.job.JobType;
@ -62,7 +63,7 @@ public class DefaultJobManager implements JobManager {
private final JobStatsService jobStatsService;
private final Map<JobType, JobProcessor> jobProcessors;
private final Map<JobType, TbQueueProducer<TbProtoQueueMsg<TaskProto>>> taskProducers;
private final QueueConsumerManager<TbProtoQueueMsg<JobStatsMsg>> taskResultConsumer;
private final QueueConsumerManager<TbProtoQueueMsg<JobStatsMsg>> jobStatsConsumer;
private final ExecutorService consumerExecutor;
@Value("${queue.tasks.stats.processing_interval_ms:5000}")
@ -73,8 +74,8 @@ public class DefaultJobManager implements JobManager {
this.jobStatsService = jobStatsService;
this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity()));
this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer));
this.consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("task-result-consumer"));
this.taskResultConsumer = QueueConsumerManager.<TbProtoQueueMsg<JobStatsMsg>>builder()
this.consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("job-stats-consumer"));
this.jobStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<JobStatsMsg>>builder()
.name("job-stats")
.msgPackProcessor(this::processStats)
.pollInterval(125)
@ -85,8 +86,8 @@ public class DefaultJobManager implements JobManager {
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void afterStartUp() {
taskResultConsumer.subscribe();
taskResultConsumer.launch();
jobStatsConsumer.subscribe();
jobStatsConsumer.launch();
}
@Override
@ -95,10 +96,16 @@ public class DefaultJobManager implements JobManager {
log.info("Submitting job: {}", job);
int tasksCount = jobProcessors.get(job.getType()).process(job, this::submitTask);
jobStatsService.reportAllTasksSubmitted(job.getId(), tasksCount);
jobStatsService.reportAllTasksSubmitted(job.getTenantId(), job.getId(), tasksCount);
return job;
}
@Override
public void cancelJob(TenantId tenantId, JobId jobId) {
log.info("Cancelling job: {}", jobId);
jobService.cancelJob(tenantId, jobId);
}
private void submitTask(Task task) {
log.info("Submitting task: {}", task);
TaskProto taskProto = TaskProto.newBuilder()
@ -126,8 +133,9 @@ public class DefaultJobManager implements JobManager {
for (TbProtoQueueMsg<JobStatsMsg> msg : msgs) {
JobStatsMsg statsMsg = msg.getValue();
TenantId tenantId = TenantId.fromUUID(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB()));
JobId jobId = new JobId(new UUID(statsMsg.getJobIdMSB(), statsMsg.getJobIdLSB()));
JobStats jobStats = stats.computeIfAbsent(jobId, JobStats::new);
JobStats jobStats = stats.computeIfAbsent(jobId, __ -> new JobStats(tenantId, jobId));
if (statsMsg.hasTaskResult()) {
TaskResult taskResult = JacksonUtil.fromString(statsMsg.getTaskResult().getValue(), TaskResult.class);
@ -140,8 +148,9 @@ public class DefaultJobManager implements JobManager {
stats.forEach((jobId, jobStats) -> {
try {
log.info("[{}] Processing job stats: {}", jobId, stats);
jobService.processStats(jobId, jobStats);
TenantId tenantId = jobStats.getTenantId();
log.info("[{}][{}] Processing job stats: {}", tenantId, jobId, stats);
jobService.processStats(tenantId, jobId, jobStats);
} catch (Exception e) {
log.warn("Failed to process job stats for {}: {}", jobId, jobStats, e);
}
@ -153,7 +162,7 @@ public class DefaultJobManager implements JobManager {
@PreDestroy
private void destroy() {
taskResultConsumer.stop();
jobStatsConsumer.stop();
consumerExecutor.shutdownNow();
}

View File

@ -15,10 +15,14 @@
*/
package org.thingsboard.server.service.job;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.Job;
public interface JobManager {
Job submitJob(Job job);
void cancelJob(TenantId tenantId, JobId jobId);
}

View File

@ -594,6 +594,7 @@ public class DefaultTbClusterService implements TbClusterService {
|| entityType.equals(EntityType.ENTITY_VIEW)
|| entityType.equals(EntityType.NOTIFICATION_RULE)
|| entityType.equals(EntityType.CALCULATED_FIELD)
|| entityType.equals(EntityType.JOB)
) {
TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);

View File

@ -20,22 +20,28 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.job.DummyJobConfiguration;
import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobResult;
import org.thingsboard.server.common.data.job.JobStatus;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.service.job.task.DummyTaskProcessor;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
@DaoSqlTest
@TestPropertySource(properties = {
@ -46,6 +52,9 @@ public class JobManagerTest extends AbstractControllerTest {
@Autowired
private JobManager jobManager;
@SpyBean
private DummyTaskProcessor taskProcessor;
@Before
public void setUp() throws Exception {
loginTenantAdmin();
@ -80,6 +89,7 @@ public class JobManagerTest extends AbstractControllerTest {
assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED);
assertThat(job.getResult().getSuccessfulCount()).isEqualTo(tasksCount);
assertThat(job.getResult().getFailures()).isEmpty();
assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount);
});
}
@ -104,15 +114,76 @@ public class JobManagerTest extends AbstractControllerTest {
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
Job job = findJobById(jobId);
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED);
assertThat(job.getResult().getSuccessfulCount()).isEqualTo(successfulTasks);
assertThat(job.getResult().getFailedCount()).isEqualTo(failedTasks);
assertThat(job.getResult().getTotalCount()).isEqualTo(successfulTasks + failedTasks);
assertThat(job.getResult().getFailures().get("Task 1")).isEqualTo("error3"); // last error
assertThat(job.getResult().getFailures().get("Task 2")).isEqualTo("error3"); // last error
JobResult jobResult = job.getResult();
assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks);
assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks);
assertThat(jobResult.getTotalCount()).isEqualTo(successfulTasks + failedTasks);
assertThat(jobResult.getFailures().get("Task 1")).isEqualTo("error3"); // last error
assertThat(jobResult.getFailures().get("Task 2")).isEqualTo("error3"); // last error
assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount());
});
}
@Test
public void testCancelJob_whileRunning() throws Exception {
int tasksCount = 100;
JobId jobId = jobManager.submitJob(Job.builder()
.tenantId(tenantId)
.type(JobType.DUMMY)
.key("test-job")
.description("test job")
.configuration(DummyJobConfiguration.builder()
.successfulTasksCount(tasksCount)
.taskProcessingTimeMs(100)
.build())
.build()).getId();
Thread.sleep(500);
jobManager.cancelJob(tenantId, jobId);
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
Job job = findJobById(jobId);
assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED);
assertThat(job.getResult().getSuccessfulCount()).isBetween(1, tasksCount - 1);
assertThat(job.getResult().getCancelledCount()).isBetween(1, tasksCount - 1);
assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount);
assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount);
});
}
@Test
public void testCancelJob_simulateTaskProcessorRestart() {
int tasksCount = 10;
JobId jobId = jobManager.submitJob(Job.builder()
.tenantId(tenantId)
.type(JobType.DUMMY)
.key("test-job")
.description("test job")
.configuration(DummyJobConfiguration.builder()
.successfulTasksCount(tasksCount)
.taskProcessingTimeMs(100)
.build())
.build()).getId();
// simulate cancelled jobs are forgotten
AtomicInteger cancellationRenotifyAttempt = new AtomicInteger(0);
doAnswer(inv -> {
if (cancellationRenotifyAttempt.incrementAndGet() >= 5) {
inv.callRealMethod();
}
return null;
}).when(taskProcessor).addToCancelledJobs(any()); // ignoring cancellation event,
jobManager.cancelJob(tenantId, jobId);
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
Job job = findJobById(jobId);
System.err.println(job);
assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED);
assertThat(job.getResult().getSuccessfulCount()).isBetween(1, tasksCount - 1);
assertThat(job.getResult().getCancelledCount()).isBetween(1, tasksCount - 1);
assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount);
assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount);
});
}
private Job findJobById(JobId jobId) throws Exception {
return doGet("/api/job/" + jobId, Job.class);

View File

@ -29,7 +29,9 @@ public interface JobService extends EntityDaoService {
Job findJobById(TenantId tenantId, JobId jobId);
void processStats(JobId jobId, JobStats jobStats);
void cancelJob(TenantId tenantId, JobId jobId);
void processStats(TenantId tenantId, JobId jobId, JobStats jobStats);
PageData<Job> findJobsByTenantId(TenantId tenantId, PageLink pageLink);

View File

@ -20,13 +20,15 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.io.Serializable;
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@Type(name = "CF_REPROCESSING", value = CfReprocessingJobConfiguration.class),
@Type(name = "DUMMY", value = DummyJobConfiguration.class),
})
public interface JobConfiguration {
public interface JobConfiguration extends Serializable {
JobType getType();

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.common.data.job;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
@ -22,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@ -33,13 +35,21 @@ import java.util.Map;
})
@Data
@NoArgsConstructor
public abstract class JobResult {
public abstract class JobResult implements Serializable {
private int successfulCount;
private int failedCount;
private int cancelledCount;
private Integer totalCount = null; // set when all tasks are submitted
private Map<String, String> failures = new HashMap<>();
private long cancellationTs;
@JsonIgnore
public int getCompletedCount() {
return successfulCount + failedCount + cancelledCount;
}
public abstract JobType getJobType();
}

View File

@ -17,12 +17,14 @@ package org.thingsboard.server.common.data.job;
import lombok.Data;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.ArrayList;
import java.util.List;
@Data
public class JobStats {
private final TenantId tenantId;
private final JobId jobId;
private final List<TaskResult> taskResults = new ArrayList<>();
private Integer totalTasksCount;

View File

@ -27,6 +27,7 @@ import lombok.NoArgsConstructor;
public class TaskResult {
private boolean success;
private boolean cancelled;
private TaskFailure failure;
@Data

View File

@ -61,6 +61,7 @@ enum EntityTypeProto {
MOBILE_APP_BUNDLE = 38;
CALCULATED_FIELD = 39;
CALCULATED_FIELD_LINK = 40;
JOB = 41;
}
enum ApiUsageRecordKeyProto {
@ -534,6 +535,10 @@ message ToEdqsCoreServiceMsg {
bytes value = 1;
}
message ToJobManagerMsg {
bytes value = 1;
}
message LwM2MRegistrationRequestMsg {
string tenantId = 1;
string endpoint = 2;
@ -1852,10 +1857,12 @@ message TaskProto {
}
message JobStatsMsg {
int64 jobIdMSB = 1;
int64 jobIdLSB = 2;
optional TaskResultProto taskResult = 3;
optional int32 totalTasksCount = 4;
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 jobIdMSB = 3;
int64 jobIdLSB = 4;
optional TaskResultProto taskResult = 5;
optional int32 totalTasksCount = 6;
}
message TaskResultProto {

View File

@ -21,6 +21,7 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.TaskResult;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
@ -38,21 +39,23 @@ public class JobStatsService {
private final TbQueueProducerProvider producerProvider;
public void reportTaskResult(JobId jobId, TaskResult result) {
report(jobId, JobStatsMsg.newBuilder()
public void reportTaskResult(TenantId tenantId, JobId jobId, TaskResult result) {
report(tenantId, jobId, JobStatsMsg.newBuilder()
.setTaskResult(TaskResultProto.newBuilder()
.setValue(JacksonUtil.toString(result))
.build()));
}
public void reportAllTasksSubmitted(JobId jobId, int tasksCount) {
report(jobId, JobStatsMsg.newBuilder()
public void reportAllTasksSubmitted(TenantId tenantId, JobId jobId, int tasksCount) {
report(tenantId, jobId, JobStatsMsg.newBuilder()
.setTotalTasksCount(tasksCount));
}
private void report(JobId jobId, JobStatsMsg.Builder statsMsg) {
private void report(TenantId tenantId, JobId jobId, JobStatsMsg.Builder statsMsg) {
log.info("[{}] Reporting: {}", jobId, statsMsg);
statsMsg.setJobIdMSB(jobId.getId().getMostSignificantBits())
statsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setJobIdMSB(jobId.getId().getMostSignificantBits())
.setJobIdLSB(jobId.getId().getLeastSignificantBits());
TbProtoQueueMsg<JobStatsMsg> msg = new TbProtoQueueMsg<>(jobId.getId(), statsMsg.build());

View File

@ -17,14 +17,20 @@ package org.thingsboard.server.queue.task;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.Task;
import org.thingsboard.server.common.data.job.TaskResult;
import org.thingsboard.server.common.data.job.TaskResult.TaskFailure;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -33,12 +39,16 @@ import org.thingsboard.server.queue.provider.TaskProcessorQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public abstract class TaskProcessor<T extends Task> {
protected final Logger log = LoggerFactory.getLogger(getClass());
@Autowired
private TaskProcessorQueueFactory queueFactory;
@Autowired
@ -47,12 +57,14 @@ public abstract class TaskProcessor<T extends Task> {
private QueueConsumerManager<TbProtoQueueMsg<TaskProto>> taskConsumer;
private ExecutorService consumerExecutor;
private final Set<UUID> cancelledJobs = ConcurrentHashMap.newKeySet(); // fixme use caffeine
@PostConstruct
public void init() {
consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getJobType().name().toLowerCase() + "-task-consumer"));
taskConsumer = QueueConsumerManager.<TbProtoQueueMsg<TaskProto>>builder() // fixme: should be consumer per partition
.name(getJobType().name().toLowerCase() + "-tasks")
.msgPackProcessor(this::processMsgs)
.msgPackProcessor(this::processMsgs) // todo: max.poll.records = 1
.pollInterval(125)
.consumerCreator(() -> queueFactory.createTaskConsumer(getJobType()))
.consumerExecutor(consumerExecutor)
@ -65,16 +77,27 @@ public abstract class TaskProcessor<T extends Task> {
taskConsumer.launch();
}
@PreDestroy
public void destroy() {
taskConsumer.stop();
consumerExecutor.shutdownNow();
@EventListener
public void onJobCancelled(ComponentLifecycleMsg event) {
if (event.getEntityId().getEntityType() != EntityType.JOB) {
return;
}
JobId jobId = (JobId) event.getEntityId();
if (event.getEvent() == ComponentLifecycleEvent.STOPPED) {
log.info("Adding job {} to cancelled", jobId);
addToCancelledJobs(jobId);
}
}
private void processMsgs(List<TbProtoQueueMsg<TaskProto>> msgs, TbQueueConsumer<TbProtoQueueMsg<TaskProto>> consumer) {
for (TbProtoQueueMsg<TaskProto> msg : msgs) {
TaskProto taskProto = msg.getValue();
Task task = JacksonUtil.fromString(taskProto.getValue(), Task.class);
if (cancelledJobs.contains(task.getJobId().getId())) {
log.info("Skipping task '{}' for cancelled job {}", task.getKey(), task.getJobId());
reportCancelled(task);
continue;
}
processTask((T) task);
}
consumer.commit();
@ -96,11 +119,13 @@ public abstract class TaskProcessor<T extends Task> {
}
}
protected abstract void process(T task) throws Exception;
private void reportSuccess(Task task) {
TaskResult result = TaskResult.builder()
.success(true)
.build();
statsService.reportTaskResult(task.getJobId(), result);
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
}
private void reportFailure(Task task, Throwable error) {
@ -110,10 +135,26 @@ public abstract class TaskProcessor<T extends Task> {
.task(task)
.build())
.build();
statsService.reportTaskResult(task.getJobId(), result);
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
}
private void reportCancelled(Task task) {
TaskResult result = TaskResult.builder()
.cancelled(true)
.build();
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
}
public void addToCancelledJobs(JobId jobId) {
cancelledJobs.add(jobId.getId());
}
@PreDestroy
public void destroy() {
taskConsumer.stop();
consumerExecutor.shutdownNow();
}
protected abstract void process(T task) throws Exception;
public abstract JobType getJobType();

View File

@ -15,10 +15,12 @@
*/
package org.thingsboard.server.dao.sql.task;
import jakarta.persistence.LockModeType;
import jakarta.transaction.Transactional;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Lock;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
@ -40,6 +42,10 @@ public interface JobRepository extends JpaRepository<JobEntity, UUID> {
@Param("searchText") String searchText,
Pageable pageable);
@Lock(LockModeType.PESSIMISTIC_WRITE) // SELECT FOR UPDATE
@Query("SELECT j FROM JobEntity j WHERE j.id = :id")
JobEntity findByIdForUpdate(UUID id);
@Modifying
@Transactional
@Query(value = """

View File

@ -49,13 +49,8 @@ public class JpaJobDao extends JpaAbstractDao<JobEntity, Job> implements JobDao
}
@Override
public boolean reportTaskSuccess(JobId jobId, int tasksCount) {
return jobRepository.reportTaskSuccess(jobId.getId(), tasksCount);
}
@Override
public boolean reportTaskFailure(JobId jobId, String taskKey, String error) {
return jobRepository.reportTaskFailure(jobId.getId(), taskKey, error);
public Job findByIdForUpdate(TenantId tenantId, JobId jobId) {
return DaoUtil.getData(jobRepository.findByIdForUpdate(jobId.getId()));
}
@Override

View File

@ -21,6 +21,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.HasId;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.Job;
@ -31,6 +32,8 @@ import org.thingsboard.server.common.data.job.TaskResult;
import org.thingsboard.server.common.data.job.TaskResult.TaskFailure;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.DataValidator;
@ -39,7 +42,7 @@ import java.util.Optional;
@Service
@RequiredArgsConstructor
@Slf4j
public class DefaultJobService implements JobService {
public class DefaultJobService extends AbstractEntityService implements JobService {
private final JobDao jobDao;
private final JobValidator validator = new JobValidator();
@ -47,7 +50,7 @@ public class DefaultJobService implements JobService {
@Override
public Job createJob(TenantId tenantId, Job job) {
validator.validate(job, Job::getTenantId);
return jobDao.save(tenantId, job);
return saveJob(tenantId, job, false);
}
@Override
@ -55,9 +58,21 @@ public class DefaultJobService implements JobService {
return jobDao.findById(tenantId, jobId.getId());
}
@Transactional
@Override
public void processStats(JobId jobId, JobStats jobStats) {
Job job = jobDao.findById(TenantId.SYS_TENANT_ID, jobId.getId());
public void cancelJob(TenantId tenantId, JobId jobId) {
Job job = findForUpdate(tenantId, jobId);
if (job.getStatus() != JobStatus.PENDING && job.getStatus() != JobStatus.RUNNING) {
throw new IllegalArgumentException("Job already " + job.getStatus().name().toLowerCase());
}
job.getResult().setCancellationTs(System.currentTimeMillis());
saveJob(tenantId, job, true);
}
@Transactional
@Override
public void processStats(TenantId tenantId, JobId jobId, JobStats jobStats) {
Job job = findForUpdate(tenantId, jobId);
switch (job.getStatus()) {
case PENDING -> {
job.setStatus(JobStatus.RUNNING);
@ -73,26 +88,52 @@ public class DefaultJobService implements JobService {
jobResult.setTotalCount(jobStats.getTotalTasksCount());
}
boolean publishEvent = false;
for (TaskResult taskResult : jobStats.getTaskResults()) {
if (taskResult.isSuccess()) {
jobResult.setSuccessfulCount(jobResult.getSuccessfulCount() + 1);
} else if (taskResult.isCancelled()) {
jobResult.setCancelledCount(jobResult.getCancelledCount() + 1);
} else {
TaskFailure failure = taskResult.getFailure();
String key = failure.getTask().getKey();
jobResult.setFailedCount(jobResult.getFailedCount() + 1);
jobResult.getFailures().put(key, failure.getError());
}
if (jobResult.getCancellationTs() > 0) {
if (!taskResult.isCancelled() && System.currentTimeMillis() > jobResult.getCancellationTs()) {
log.info("Got task result for cancelled job {}: {}, re-notifying processors about cancellation", jobId, taskResult);
// task processor forgot the task is cancelled
publishEvent = true;
}
}
}
if (jobResult.getTotalCount() != null && jobResult.getSuccessfulCount() + jobResult.getFailedCount() >= jobResult.getTotalCount()) {
if (jobResult.getFailures().isEmpty()) {
job.setStatus(JobStatus.COMPLETED);
} else {
if (jobResult.getTotalCount() != null && jobResult.getCompletedCount() >= jobResult.getTotalCount()) {
if (jobResult.getCancellationTs() > 0) {
job.setStatus(JobStatus.CANCELLED);
} else if (jobResult.getFailedCount() > 0) {
job.setStatus(JobStatus.FAILED);
} else {
job.setStatus(JobStatus.COMPLETED);
}
}
log.info("Saving job {}", job);
jobDao.save(TenantId.SYS_TENANT_ID, job);
saveJob(tenantId, job, publishEvent);
}
private Job saveJob(TenantId tenantId, Job job, boolean publishEvent) {
job = jobDao.save(tenantId, job);
if (publishEvent) {
eventPublisher.publishEvent(SaveEntityEvent.builder()
.tenantId(tenantId)
.entityId(job.getId())
.entity(job)
.created(false)
.build());
}
return job;
}
@Override
@ -100,6 +141,10 @@ public class DefaultJobService implements JobService {
return jobDao.findByTenantId(tenantId, pageLink);
}
private Job findForUpdate(TenantId tenantId, JobId jobId) {
return jobDao.findByIdForUpdate(tenantId, jobId);
}
// todo: cancellation, reprocessing
public class JobValidator extends DataValidator<Job> {

View File

@ -28,9 +28,7 @@ public interface JobDao extends Dao<Job> {
PageData<Job> findByTenantId(TenantId tenantId, PageLink pageLink);
boolean reportTaskSuccess(JobId jobId, int tasksCount);
boolean reportTaskFailure(JobId jobId, String taskKey, String error);
Job findByIdForUpdate(TenantId tenantId, JobId jobId);
boolean existsByKeyAndStatusOneOf(String key, JobStatus... statuses);