Jobs reprocessing
This commit is contained in:
parent
1562fc19d9
commit
479ff8e25e
@ -82,4 +82,11 @@ public class JobController extends BaseController {
|
|||||||
jobManager.cancelJob(getTenantId(), new JobId(id));
|
jobManager.cancelJob(getTenantId(), new JobId(id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostMapping("/job/{id}/reprocess")
|
||||||
|
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')")
|
||||||
|
public void reprocessJob(@PathVariable UUID id) throws ThingsboardException {
|
||||||
|
// todo check permissions
|
||||||
|
jobManager.reprocessJob(getTenantId(), new JobId(id));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -24,19 +24,24 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
|
|||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.job.CfReprocessingJobConfiguration;
|
import org.thingsboard.server.common.data.job.CfReprocessingJobConfiguration;
|
||||||
import org.thingsboard.server.common.data.job.CfReprocessingTask;
|
import org.thingsboard.server.common.data.job.CfReprocessingTask;
|
||||||
|
import org.thingsboard.server.common.data.job.CfReprocessingTask.CfReprocessingTaskFailure;
|
||||||
import org.thingsboard.server.common.data.job.Job;
|
import org.thingsboard.server.common.data.job.Job;
|
||||||
import org.thingsboard.server.common.data.job.JobType;
|
import org.thingsboard.server.common.data.job.JobType;
|
||||||
import org.thingsboard.server.common.data.job.Task;
|
import org.thingsboard.server.common.data.job.Task;
|
||||||
|
import org.thingsboard.server.common.data.job.TaskFailure;
|
||||||
import org.thingsboard.server.common.data.page.PageDataIterable;
|
import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||||
import org.thingsboard.server.dao.asset.AssetService;
|
import org.thingsboard.server.dao.asset.AssetService;
|
||||||
|
import org.thingsboard.server.dao.cf.CalculatedFieldService;
|
||||||
import org.thingsboard.server.dao.device.DeviceService;
|
import org.thingsboard.server.dao.device.DeviceService;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class CfReprocessingJobProcessor implements JobProcessor {
|
public class CfReprocessingJobProcessor implements JobProcessor {
|
||||||
|
|
||||||
|
private final CalculatedFieldService calculatedFieldService;
|
||||||
private final DeviceService deviceService;
|
private final DeviceService deviceService;
|
||||||
private final AssetService assetService;
|
private final AssetService assetService;
|
||||||
|
|
||||||
@ -44,12 +49,12 @@ public class CfReprocessingJobProcessor implements JobProcessor {
|
|||||||
public int process(Job job, Consumer<Task> taskConsumer) throws Exception {
|
public int process(Job job, Consumer<Task> taskConsumer) throws Exception {
|
||||||
CfReprocessingJobConfiguration configuration = job.getConfiguration();
|
CfReprocessingJobConfiguration configuration = job.getConfiguration();
|
||||||
|
|
||||||
CalculatedField calculatedField = configuration.getCalculatedField();
|
CalculatedField calculatedField = calculatedFieldService.findById(job.getTenantId(), configuration.getCalculatedFieldId());
|
||||||
EntityId cfEntityId = calculatedField.getEntityId();
|
EntityId cfEntityId = calculatedField.getEntityId();
|
||||||
|
|
||||||
int tasksCount = 0;
|
int tasksCount = 0;
|
||||||
if (cfEntityId.getEntityType().isOneOf(EntityType.DEVICE, EntityType.ASSET)) {
|
if (cfEntityId.getEntityType().isOneOf(EntityType.DEVICE, EntityType.ASSET)) {
|
||||||
taskConsumer.accept(createTask(job, configuration, cfEntityId));
|
taskConsumer.accept(createTask(job, configuration, calculatedField, cfEntityId));
|
||||||
tasksCount++;
|
tasksCount++;
|
||||||
} else {
|
} else {
|
||||||
PageDataIterable<? extends EntityId> entities;
|
PageDataIterable<? extends EntityId> entities;
|
||||||
@ -61,20 +66,31 @@ public class CfReprocessingJobProcessor implements JobProcessor {
|
|||||||
throw new IllegalArgumentException("Unsupported CF entity type " + cfEntityId.getEntityType());
|
throw new IllegalArgumentException("Unsupported CF entity type " + cfEntityId.getEntityType());
|
||||||
}
|
}
|
||||||
for (EntityId entityId : entities) {
|
for (EntityId entityId : entities) {
|
||||||
taskConsumer.accept(createTask(job, configuration, entityId));
|
taskConsumer.accept(createTask(job, configuration, calculatedField, entityId));
|
||||||
tasksCount++;
|
tasksCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return tasksCount;
|
return tasksCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Task createTask(Job job, CfReprocessingJobConfiguration configuration, EntityId entityId) {
|
@Override
|
||||||
|
public void reprocess(Job job, List<TaskFailure> failures, Consumer<Task> taskConsumer) throws Exception {
|
||||||
|
CfReprocessingJobConfiguration configuration = job.getConfiguration();
|
||||||
|
CalculatedField calculatedField = calculatedFieldService.findById(job.getTenantId(), configuration.getCalculatedFieldId());
|
||||||
|
|
||||||
|
for (TaskFailure failure : failures) {
|
||||||
|
CfReprocessingTaskFailure taskFailure = (CfReprocessingTaskFailure) failure;
|
||||||
|
EntityId entityId = taskFailure.getEntityId();
|
||||||
|
taskConsumer.accept(createTask(job, job.getConfiguration(), calculatedField, entityId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task createTask(Job job, CfReprocessingJobConfiguration configuration, CalculatedField calculatedField, EntityId entityId) {
|
||||||
return CfReprocessingTask.builder()
|
return CfReprocessingTask.builder()
|
||||||
.tenantId(job.getTenantId())
|
.tenantId(job.getTenantId())
|
||||||
.jobId(job.getId())
|
.jobId(job.getId())
|
||||||
.key(entityId.getEntityType().getNormalName() + " " + entityId.getId())
|
|
||||||
.retries(2) // 3 attempts in total
|
.retries(2) // 3 attempts in total
|
||||||
.calculatedField(configuration.getCalculatedField())
|
.calculatedField(calculatedField)
|
||||||
.entityId(entityId)
|
.entityId(entityId)
|
||||||
.startTs(configuration.getStartTs())
|
.startTs(configuration.getStartTs())
|
||||||
.endTs(configuration.getEndTs())
|
.endTs(configuration.getEndTs())
|
||||||
|
|||||||
@ -27,10 +27,12 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
|||||||
import org.thingsboard.server.common.data.id.JobId;
|
import org.thingsboard.server.common.data.id.JobId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.job.Job;
|
import org.thingsboard.server.common.data.job.Job;
|
||||||
|
import org.thingsboard.server.common.data.job.JobResult;
|
||||||
import org.thingsboard.server.common.data.job.JobStats;
|
import org.thingsboard.server.common.data.job.JobStats;
|
||||||
import org.thingsboard.server.common.data.job.JobStatus;
|
import org.thingsboard.server.common.data.job.JobStatus;
|
||||||
import org.thingsboard.server.common.data.job.JobType;
|
import org.thingsboard.server.common.data.job.JobType;
|
||||||
import org.thingsboard.server.common.data.job.Task;
|
import org.thingsboard.server.common.data.job.Task;
|
||||||
|
import org.thingsboard.server.common.data.job.TaskFailure;
|
||||||
import org.thingsboard.server.common.data.job.TaskResult;
|
import org.thingsboard.server.common.data.job.TaskResult;
|
||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
import org.thingsboard.server.dao.job.JobService;
|
import org.thingsboard.server.dao.job.JobService;
|
||||||
@ -48,6 +50,7 @@ import org.thingsboard.server.queue.util.AfterStartUp;
|
|||||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -98,19 +101,32 @@ public class DefaultJobManager implements JobManager {
|
|||||||
@Override
|
@Override
|
||||||
public Job submitJob(Job job) {
|
public Job submitJob(Job job) {
|
||||||
log.debug("Submitting job: {}", job);
|
log.debug("Submitting job: {}", job);
|
||||||
return jobService.createJob(job.getTenantId(), job);
|
return jobService.submitJob(job.getTenantId(), job);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onJobUpdate(Job job) {
|
public void onJobUpdate(Job job) {
|
||||||
if (job.getStatus() == JobStatus.PENDING) {
|
if (job.getStatus() == JobStatus.PENDING) {
|
||||||
executor.execute(() -> {
|
executor.execute(() -> {
|
||||||
|
processJob(job);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processJob(Job job) {
|
||||||
TenantId tenantId = job.getTenantId();
|
TenantId tenantId = job.getTenantId();
|
||||||
JobId jobId = job.getId();
|
JobId jobId = job.getId();
|
||||||
try {
|
try {
|
||||||
int tasksCount = jobProcessors.get(job.getType()).process(job, this::submitTask); // todo: think about stopping tb - while tasks are being submitted
|
JobProcessor processor = jobProcessors.get(job.getType());
|
||||||
|
List<TaskFailure> toReprocess = job.getConfiguration().getToReprocess();
|
||||||
|
if (toReprocess == null) {
|
||||||
|
int tasksCount = processor.process(job, this::submitTask); // todo: think about stopping tb - while tasks are being submitted
|
||||||
log.info("[{}][{}][{}] Submitted {} tasks", tenantId, jobId, job.getType(), tasksCount);
|
log.info("[{}][{}][{}] Submitted {} tasks", tenantId, jobId, job.getType(), tasksCount);
|
||||||
jobStatsService.reportAllTasksSubmitted(tenantId, jobId, tasksCount);
|
jobStatsService.reportAllTasksSubmitted(tenantId, jobId, tasksCount);
|
||||||
|
} else {
|
||||||
|
processor.reprocess(job, toReprocess, this::submitTask);
|
||||||
|
log.info("[{}][{}][{}] Submitted {} tasks for reprocessing", tenantId, jobId, job.getType(), toReprocess.size());
|
||||||
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
log.error("[{}][{}][{}] Failed to submit tasks", tenantId, jobId, job.getType(), e);
|
log.error("[{}][{}][{}] Failed to submit tasks", tenantId, jobId, job.getType(), e);
|
||||||
try {
|
try {
|
||||||
@ -119,8 +135,6 @@ public class DefaultJobManager implements JobManager {
|
|||||||
log.error("[{}][{}] Failed to mark job as failed", tenantId, jobId, e2);
|
log.error("[{}][{}] Failed to mark job as failed", tenantId, jobId, e2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -129,6 +143,31 @@ public class DefaultJobManager implements JobManager {
|
|||||||
jobService.cancelJob(tenantId, jobId);
|
jobService.cancelJob(tenantId, jobId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reprocessJob(TenantId tenantId, JobId jobId) {
|
||||||
|
log.info("[{}][{}] Reprocessing job", tenantId, jobId);
|
||||||
|
Job job = jobService.findJobById(tenantId, jobId);
|
||||||
|
if (job.getStatus() != JobStatus.FAILED) {
|
||||||
|
throw new IllegalArgumentException("Job is not failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
JobResult result = job.getResult();
|
||||||
|
if (result.getGeneralError() != null) {
|
||||||
|
throw new IllegalArgumentException("Reprocessing not allowed since job has general error");
|
||||||
|
}
|
||||||
|
List<TaskFailure> failures = result.getFailures();
|
||||||
|
if (result.getFailedCount() > failures.size()) {
|
||||||
|
throw new IllegalArgumentException("Reprocessing not allowed since there are too many failures (more than " + failures.size() + ")");
|
||||||
|
}
|
||||||
|
|
||||||
|
result.setFailedCount(0);
|
||||||
|
result.setFailures(Collections.emptyList());
|
||||||
|
|
||||||
|
job.getConfiguration().setToReprocess(failures);
|
||||||
|
|
||||||
|
jobService.submitJob(tenantId, job);
|
||||||
|
}
|
||||||
|
|
||||||
private void submitTask(Task task) {
|
private void submitTask(Task task) {
|
||||||
log.info("[{}][{}] Submitting task: {}", task.getTenantId(), task.getJobId(), task);
|
log.info("[{}][{}] Submitting task: {}", task.getTenantId(), task.getJobId(), task);
|
||||||
TaskProto taskProto = TaskProto.newBuilder()
|
TaskProto taskProto = TaskProto.newBuilder()
|
||||||
|
|||||||
@ -19,10 +19,13 @@ import lombok.RequiredArgsConstructor;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.thingsboard.server.common.data.job.DummyJobConfiguration;
|
import org.thingsboard.server.common.data.job.DummyJobConfiguration;
|
||||||
import org.thingsboard.server.common.data.job.DummyTask;
|
import org.thingsboard.server.common.data.job.DummyTask;
|
||||||
|
import org.thingsboard.server.common.data.job.DummyTask.DummyTaskFailure;
|
||||||
import org.thingsboard.server.common.data.job.Job;
|
import org.thingsboard.server.common.data.job.Job;
|
||||||
import org.thingsboard.server.common.data.job.JobType;
|
import org.thingsboard.server.common.data.job.JobType;
|
||||||
import org.thingsboard.server.common.data.job.Task;
|
import org.thingsboard.server.common.data.job.Task;
|
||||||
|
import org.thingsboard.server.common.data.job.TaskFailure;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
@ -35,31 +38,48 @@ public class DummyJobProcessor implements JobProcessor {
|
|||||||
DummyJobConfiguration configuration = job.getConfiguration();
|
DummyJobConfiguration configuration = job.getConfiguration();
|
||||||
if (configuration.getGeneralError() != null) {
|
if (configuration.getGeneralError() != null) {
|
||||||
for (int number = 1; number <= configuration.getSubmittedTasksBeforeGeneralError(); number++) {
|
for (int number = 1; number <= configuration.getSubmittedTasksBeforeGeneralError(); number++) {
|
||||||
taskConsumer.accept(createTask(job, configuration, number, null));
|
taskConsumer.accept(createTask(job, configuration, number, null, false));
|
||||||
}
|
}
|
||||||
Thread.sleep(configuration.getTaskProcessingTimeMs() * (configuration.getSubmittedTasksBeforeGeneralError() / 2)); // sleeping so that some tasks are processed
|
Thread.sleep(configuration.getTaskProcessingTimeMs() * (configuration.getSubmittedTasksBeforeGeneralError() / 2)); // sleeping so that some tasks are processed
|
||||||
throw new RuntimeException(configuration.getGeneralError());
|
throw new RuntimeException(configuration.getGeneralError());
|
||||||
}
|
}
|
||||||
for (int number = 1; number <= configuration.getSuccessfulTasksCount(); number++) {
|
|
||||||
taskConsumer.accept(createTask(job, configuration, number, null));
|
int taskNumber = 1;
|
||||||
|
for (int i = 0; i < configuration.getSuccessfulTasksCount(); i++) {
|
||||||
|
taskConsumer.accept(createTask(job, configuration, taskNumber, null, false));
|
||||||
|
taskNumber++;
|
||||||
}
|
}
|
||||||
if (configuration.getErrors() != null) {
|
if (configuration.getErrors() != null) {
|
||||||
for (int number = 1; number <= configuration.getFailedTasksCount(); number++) {
|
for (int i = 0; i < configuration.getFailedTasksCount(); i++) {
|
||||||
taskConsumer.accept(createTask(job, configuration, number, configuration.getErrors()));
|
taskConsumer.accept(createTask(job, configuration, taskNumber, configuration.getErrors(), false));
|
||||||
|
taskNumber++;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < configuration.getPermanentlyFailedTasksCount(); i++) {
|
||||||
|
taskConsumer.accept(createTask(job, configuration, taskNumber, configuration.getErrors(), true));
|
||||||
|
taskNumber++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return configuration.getSuccessfulTasksCount() + configuration.getFailedTasksCount();
|
return configuration.getSuccessfulTasksCount() + configuration.getFailedTasksCount() + configuration.getPermanentlyFailedTasksCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Task createTask(Job job, DummyJobConfiguration configuration, int number, List<String> errors) {
|
@Override
|
||||||
|
public void reprocess(Job job, List<TaskFailure> failures, Consumer<Task> taskConsumer) throws Exception {
|
||||||
|
for (TaskFailure failure : failures) {
|
||||||
|
DummyTaskFailure taskFailure = (DummyTaskFailure) failure;
|
||||||
|
taskConsumer.accept(createTask(job, job.getConfiguration(), taskFailure.getNumber(), taskFailure.isFailAlways() ?
|
||||||
|
List.of(taskFailure.getError()) : Collections.emptyList(), taskFailure.isFailAlways()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task createTask(Job job, DummyJobConfiguration configuration, int number, List<String> errors, boolean failAlways) {
|
||||||
return DummyTask.builder()
|
return DummyTask.builder()
|
||||||
.tenantId(job.getTenantId())
|
.tenantId(job.getTenantId())
|
||||||
.jobId(job.getId())
|
.jobId(job.getId())
|
||||||
.key("Task " + number)
|
|
||||||
.retries(configuration.getRetries())
|
.retries(configuration.getRetries())
|
||||||
.number(number)
|
.number(number)
|
||||||
.processingTimeMs(configuration.getTaskProcessingTimeMs())
|
.processingTimeMs(configuration.getTaskProcessingTimeMs())
|
||||||
.errors(errors)
|
.errors(errors)
|
||||||
|
.failAlways(failAlways)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -25,6 +25,8 @@ public interface JobManager {
|
|||||||
|
|
||||||
void cancelJob(TenantId tenantId, JobId jobId);
|
void cancelJob(TenantId tenantId, JobId jobId);
|
||||||
|
|
||||||
|
void reprocessJob(TenantId tenantId, JobId jobId);
|
||||||
|
|
||||||
void onJobUpdate(Job job);
|
void onJobUpdate(Job job);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,13 +18,17 @@ package org.thingsboard.server.service.job;
|
|||||||
import org.thingsboard.server.common.data.job.Job;
|
import org.thingsboard.server.common.data.job.Job;
|
||||||
import org.thingsboard.server.common.data.job.JobType;
|
import org.thingsboard.server.common.data.job.JobType;
|
||||||
import org.thingsboard.server.common.data.job.Task;
|
import org.thingsboard.server.common.data.job.Task;
|
||||||
|
import org.thingsboard.server.common.data.job.TaskFailure;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
public interface JobProcessor {
|
public interface JobProcessor {
|
||||||
|
|
||||||
int process(Job job, Consumer<Task> taskConsumer) throws Exception;
|
int process(Job job, Consumer<Task> taskConsumer) throws Exception;
|
||||||
|
|
||||||
|
void reprocess(Job job, List<TaskFailure> failures, Consumer<Task> taskConsumer) throws Exception;
|
||||||
|
|
||||||
JobType getType();
|
JobType getType();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -30,6 +30,9 @@ public class DummyTaskProcessor extends TaskProcessor<DummyTask> {
|
|||||||
if (task.getProcessingTimeMs() > 0) {
|
if (task.getProcessingTimeMs() > 0) {
|
||||||
Thread.sleep(task.getProcessingTimeMs());
|
Thread.sleep(task.getProcessingTimeMs());
|
||||||
}
|
}
|
||||||
|
if (task.isFailAlways()) {
|
||||||
|
throw new RuntimeException(task.getErrors().get(0));
|
||||||
|
}
|
||||||
if (task.getErrors() != null && task.getAttempt() <= task.getErrors().size()) {
|
if (task.getErrors() != null && task.getAttempt() <= task.getErrors().size()) {
|
||||||
String error = task.getErrors().get(task.getAttempt() - 1);
|
String error = task.getErrors().get(task.getAttempt() - 1);
|
||||||
throw new RuntimeException(error);
|
throw new RuntimeException(error);
|
||||||
|
|||||||
@ -1264,4 +1264,12 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
|
|||||||
return doGetTypedWithPageLink("/api/jobs?", new TypeReference<PageData<Job>>() {}, new PageLink(100, 0, null, new SortOrder("createdTime", SortOrder.Direction.DESC))).getData();
|
return doGetTypedWithPageLink("/api/jobs?", new TypeReference<PageData<Job>>() {}, new PageLink(100, 0, null, new SortOrder("createdTime", SortOrder.Direction.DESC))).getData();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void cancelJob(JobId jobId) throws Exception {
|
||||||
|
doPost("/api/job/" + jobId + "/cancel").andExpect(status().isOk());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void reprocessJob(JobId jobId) throws Exception {
|
||||||
|
doPost("/api/job/" + jobId + "/reprocess").andExpect(status().isOk());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -26,6 +26,7 @@ import org.springframework.test.context.TestPropertySource;
|
|||||||
import org.thingsboard.server.common.data.id.JobId;
|
import org.thingsboard.server.common.data.id.JobId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.job.DummyJobConfiguration;
|
import org.thingsboard.server.common.data.job.DummyJobConfiguration;
|
||||||
|
import org.thingsboard.server.common.data.job.DummyTask.DummyTaskFailure;
|
||||||
import org.thingsboard.server.common.data.job.Job;
|
import org.thingsboard.server.common.data.job.Job;
|
||||||
import org.thingsboard.server.common.data.job.JobResult;
|
import org.thingsboard.server.common.data.job.JobResult;
|
||||||
import org.thingsboard.server.common.data.job.JobStatus;
|
import org.thingsboard.server.common.data.job.JobStatus;
|
||||||
@ -130,8 +131,8 @@ public class JobManagerTest extends AbstractControllerTest {
|
|||||||
assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks);
|
assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks);
|
||||||
assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks);
|
assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks);
|
||||||
assertThat(jobResult.getTotalCount()).isEqualTo(successfulTasks + failedTasks);
|
assertThat(jobResult.getTotalCount()).isEqualTo(successfulTasks + failedTasks);
|
||||||
assertThat(jobResult.getFailures().get("Task 1")).isEqualTo("error3"); // last error
|
assertThat(jobResult.getFailures().get(0).getError()).isEqualTo("error3"); // last error
|
||||||
assertThat(jobResult.getFailures().get("Task 2")).isEqualTo("error3"); // last error
|
assertThat(jobResult.getFailures().get(1).getError()).isEqualTo("error3"); // last error
|
||||||
assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount());
|
assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -151,7 +152,7 @@ public class JobManagerTest extends AbstractControllerTest {
|
|||||||
.build()).getId();
|
.build()).getId();
|
||||||
|
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
jobManager.cancelJob(tenantId, jobId);
|
cancelJob(jobId);
|
||||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
Job job = findJobById(jobId);
|
Job job = findJobById(jobId);
|
||||||
assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED);
|
assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED);
|
||||||
@ -163,7 +164,7 @@ public class JobManagerTest extends AbstractControllerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCancelJob_simulateTaskProcessorRestart() {
|
public void testCancelJob_simulateTaskProcessorRestart() throws Exception {
|
||||||
int tasksCount = 10;
|
int tasksCount = 10;
|
||||||
JobId jobId = jobManager.submitJob(Job.builder()
|
JobId jobId = jobManager.submitJob(Job.builder()
|
||||||
.tenantId(tenantId)
|
.tenantId(tenantId)
|
||||||
@ -184,7 +185,7 @@ public class JobManagerTest extends AbstractControllerTest {
|
|||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}).when(taskProcessor).addToDiscardedJobs(any()); // ignoring cancellation event,
|
}).when(taskProcessor).addToDiscardedJobs(any()); // ignoring cancellation event,
|
||||||
jobManager.cancelJob(tenantId, jobId);
|
cancelJob(jobId);
|
||||||
|
|
||||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
Job job = findJobById(jobId);
|
Job job = findJobById(jobId);
|
||||||
@ -262,7 +263,7 @@ public class JobManagerTest extends AbstractControllerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCancelQueuedJob() {
|
public void testCancelQueuedJob() throws Exception {
|
||||||
int tasksCount = 3;
|
int tasksCount = 3;
|
||||||
int jobsCount = 3;
|
int jobsCount = 3;
|
||||||
List<JobId> jobIds = new ArrayList<>();
|
List<JobId> jobIds = new ArrayList<>();
|
||||||
@ -281,7 +282,7 @@ public class JobManagerTest extends AbstractControllerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 1; i < jobIds.size(); i++) {
|
for (int i = 1; i < jobIds.size(); i++) {
|
||||||
jobManager.cancelJob(tenantId, jobIds.get(i));
|
cancelJob(jobIds.get(i));
|
||||||
}
|
}
|
||||||
|
|
||||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
@ -326,6 +327,104 @@ public class JobManagerTest extends AbstractControllerTest {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: job with zero tasks, reprocessing
|
@Test
|
||||||
|
public void testJobReprocessing() throws Exception {
|
||||||
|
int successfulTasks = 3;
|
||||||
|
int failedTasks = 2;
|
||||||
|
int totalTasksCount = successfulTasks + failedTasks;
|
||||||
|
JobId jobId = jobManager.submitJob(Job.builder()
|
||||||
|
.tenantId(tenantId)
|
||||||
|
.type(JobType.DUMMY)
|
||||||
|
.key("test-job")
|
||||||
|
.description("test job")
|
||||||
|
.configuration(DummyJobConfiguration.builder()
|
||||||
|
.successfulTasksCount(successfulTasks)
|
||||||
|
.failedTasksCount(failedTasks)
|
||||||
|
.errors(List.of("error"))
|
||||||
|
.taskProcessingTimeMs(100)
|
||||||
|
.build())
|
||||||
|
.build()).getId();
|
||||||
|
|
||||||
|
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
Job job = findJobById(jobId);
|
||||||
|
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED);
|
||||||
|
JobResult jobResult = job.getResult();
|
||||||
|
assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks);
|
||||||
|
assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks);
|
||||||
|
|
||||||
|
for (int i = 0, taskNumber = successfulTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) {
|
||||||
|
DummyTaskFailure failure = (DummyTaskFailure) jobResult.getFailures().get(i);
|
||||||
|
assertThat(failure.getNumber()).isEqualTo(taskNumber);
|
||||||
|
assertThat(failure.getError()).isEqualTo("error");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
reprocessJob(jobId);
|
||||||
|
|
||||||
|
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
Job job = findJobById(jobId);
|
||||||
|
assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED);
|
||||||
|
assertThat(job.getResult().getSuccessfulCount()).isEqualTo(totalTasksCount);
|
||||||
|
assertThat(job.getResult().getFailedCount()).isZero();
|
||||||
|
assertThat(job.getResult().getTotalCount()).isEqualTo(totalTasksCount);
|
||||||
|
assertThat(job.getResult().getFailures()).isEmpty();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJobReprocessing_somePermanentlyFailed() throws Exception {
|
||||||
|
int successfulTasks = 3;
|
||||||
|
int failedTasks = 2;
|
||||||
|
int permanentlyFailedTasks = 1;
|
||||||
|
int totalTasksCount = successfulTasks + failedTasks + permanentlyFailedTasks;
|
||||||
|
JobId jobId = jobManager.submitJob(Job.builder()
|
||||||
|
.tenantId(tenantId)
|
||||||
|
.type(JobType.DUMMY)
|
||||||
|
.key("test-job")
|
||||||
|
.description("test job")
|
||||||
|
.configuration(DummyJobConfiguration.builder()
|
||||||
|
.successfulTasksCount(successfulTasks)
|
||||||
|
.failedTasksCount(failedTasks)
|
||||||
|
.permanentlyFailedTasksCount(permanentlyFailedTasks)
|
||||||
|
.errors(List.of("error"))
|
||||||
|
.taskProcessingTimeMs(100)
|
||||||
|
.build())
|
||||||
|
.build()).getId();
|
||||||
|
|
||||||
|
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
Job job = findJobById(jobId);
|
||||||
|
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED);
|
||||||
|
JobResult jobResult = job.getResult();
|
||||||
|
assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks);
|
||||||
|
assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks + permanentlyFailedTasks);
|
||||||
|
assertThat(jobResult.getTotalCount()).isEqualTo(totalTasksCount);
|
||||||
|
|
||||||
|
for (int i = 0, taskNumber = successfulTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) {
|
||||||
|
DummyTaskFailure failure = (DummyTaskFailure) jobResult.getFailures().get(i);
|
||||||
|
assertThat(failure.getNumber()).isEqualTo(taskNumber);
|
||||||
|
assertThat(failure.getError()).isEqualTo("error");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
reprocessJob(jobId);
|
||||||
|
|
||||||
|
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
Job job = findJobById(jobId);
|
||||||
|
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED);
|
||||||
|
JobResult jobResult = job.getResult();
|
||||||
|
assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks + failedTasks);
|
||||||
|
assertThat(jobResult.getFailedCount()).isEqualTo(permanentlyFailedTasks);
|
||||||
|
assertThat(jobResult.getTotalCount()).isEqualTo(totalTasksCount);
|
||||||
|
|
||||||
|
for (int i = 0, taskNumber = successfulTasks + failedTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) {
|
||||||
|
DummyTaskFailure failure = (DummyTaskFailure) jobResult.getFailures().get(i);
|
||||||
|
assertThat(failure.getNumber()).isEqualTo(taskNumber);
|
||||||
|
assertThat(failure.getError()).isEqualTo("error");
|
||||||
|
assertThat(failure.isFailAlways()).isTrue();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo: job with zero tasks
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -25,7 +25,7 @@ import org.thingsboard.server.dao.entity.EntityDaoService;
|
|||||||
|
|
||||||
public interface JobService extends EntityDaoService {
|
public interface JobService extends EntityDaoService {
|
||||||
|
|
||||||
Job createJob(TenantId tenantId, Job job);
|
Job submitJob(TenantId tenantId, Job job);
|
||||||
|
|
||||||
Job findJobById(TenantId tenantId, JobId jobId);
|
Job findJobById(TenantId tenantId, JobId jobId);
|
||||||
|
|
||||||
|
|||||||
@ -19,17 +19,19 @@ import jakarta.validation.constraints.NotNull;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import lombok.EqualsAndHashCode;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
|
@EqualsAndHashCode(callSuper = true)
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@Builder
|
@Builder
|
||||||
public class CfReprocessingJobConfiguration implements JobConfiguration {
|
public class CfReprocessingJobConfiguration extends JobConfiguration {
|
||||||
|
|
||||||
@NotNull
|
@NotNull
|
||||||
private CalculatedField calculatedField;
|
private CalculatedFieldId calculatedFieldId;
|
||||||
private long startTs;
|
private long startTs;
|
||||||
private long endTs;
|
private long endTs;
|
||||||
|
|
||||||
|
|||||||
@ -35,9 +35,38 @@ public class CfReprocessingTask extends Task {
|
|||||||
private long startTs;
|
private long startTs;
|
||||||
private long endTs;
|
private long endTs;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getKey() {
|
||||||
|
return entityId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskFailure toFailure(Throwable error) {
|
||||||
|
return new CfReprocessingTaskFailure(entityId, error.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public JobType getJobType() {
|
public JobType getJobType() {
|
||||||
return JobType.CF_REPROCESSING;
|
return JobType.CF_REPROCESSING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@EqualsAndHashCode(callSuper = true)
|
||||||
|
@NoArgsConstructor
|
||||||
|
public static class CfReprocessingTaskFailure extends TaskFailure {
|
||||||
|
|
||||||
|
private EntityId entityId;
|
||||||
|
|
||||||
|
public CfReprocessingTaskFailure(EntityId entityId, String error) {
|
||||||
|
super(error);
|
||||||
|
this.entityId = entityId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JobType getJobType() {
|
||||||
|
return JobType.CF_REPROCESSING;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,19 +18,22 @@ package org.thingsboard.server.common.data.job;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import lombok.EqualsAndHashCode;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
|
@EqualsAndHashCode(callSuper = true)
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@Builder
|
@Builder
|
||||||
public class DummyJobConfiguration implements JobConfiguration {
|
public class DummyJobConfiguration extends JobConfiguration {
|
||||||
|
|
||||||
private long taskProcessingTimeMs;
|
private long taskProcessingTimeMs;
|
||||||
private int successfulTasksCount;
|
private int successfulTasksCount;
|
||||||
private int failedTasksCount;
|
private int failedTasksCount;
|
||||||
|
private int permanentlyFailedTasksCount;
|
||||||
private List<String> errors;
|
private List<String> errors;
|
||||||
private int retries;
|
private int retries;
|
||||||
|
|
||||||
|
|||||||
@ -33,10 +33,42 @@ public class DummyTask extends Task {
|
|||||||
private int number;
|
private int number;
|
||||||
private long processingTimeMs;
|
private long processingTimeMs;
|
||||||
private List<String> errors; // errors for each attempt
|
private List<String> errors; // errors for each attempt
|
||||||
|
private boolean failAlways;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getKey() {
|
||||||
|
return number;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskFailure toFailure(Throwable error) {
|
||||||
|
return new DummyTaskFailure(number, failAlways, error.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public JobType getJobType() {
|
public JobType getJobType() {
|
||||||
return JobType.DUMMY;
|
return JobType.DUMMY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@EqualsAndHashCode(callSuper = true)
|
||||||
|
@NoArgsConstructor
|
||||||
|
public static class DummyTaskFailure extends TaskFailure {
|
||||||
|
|
||||||
|
private int number;
|
||||||
|
private boolean failAlways;
|
||||||
|
|
||||||
|
public DummyTaskFailure(int number, boolean failAlways, String error) {
|
||||||
|
super(error);
|
||||||
|
this.number = number;
|
||||||
|
this.failAlways = failAlways;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JobType getJobType() {
|
||||||
|
return JobType.DUMMY;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,8 +19,10 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
|||||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||||
@ -28,8 +30,11 @@ import java.io.Serializable;
|
|||||||
@Type(name = "CF_REPROCESSING", value = CfReprocessingJobConfiguration.class),
|
@Type(name = "CF_REPROCESSING", value = CfReprocessingJobConfiguration.class),
|
||||||
@Type(name = "DUMMY", value = DummyJobConfiguration.class),
|
@Type(name = "DUMMY", value = DummyJobConfiguration.class),
|
||||||
})
|
})
|
||||||
public interface JobConfiguration extends Serializable {
|
@Data
|
||||||
|
public abstract class JobConfiguration implements Serializable {
|
||||||
|
|
||||||
JobType getType();
|
private List<TaskFailure> toReprocess;
|
||||||
|
|
||||||
|
public abstract JobType getType();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -24,8 +24,8 @@ import lombok.Data;
|
|||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.HashMap;
|
import java.util.ArrayList;
|
||||||
import java.util.Map;
|
import java.util.List;
|
||||||
|
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType")
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType")
|
||||||
@ -41,7 +41,7 @@ public abstract class JobResult implements Serializable {
|
|||||||
private int failedCount;
|
private int failedCount;
|
||||||
private int discardedCount;
|
private int discardedCount;
|
||||||
private Integer totalCount = null; // set when all tasks are submitted
|
private Integer totalCount = null; // set when all tasks are submitted
|
||||||
private Map<String, String> failures = new HashMap<>();
|
private List<TaskFailure> failures = new ArrayList<>();
|
||||||
private String generalError;
|
private String generalError;
|
||||||
|
|
||||||
private long cancellationTs;
|
private long cancellationTs;
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
package org.thingsboard.server.common.data.job;
|
package org.thingsboard.server.common.data.job;
|
||||||
|
|
||||||
public enum JobStatus {
|
public enum JobStatus {
|
||||||
|
|
||||||
QUEUED,
|
QUEUED,
|
||||||
PENDING,
|
PENDING,
|
||||||
RUNNING,
|
RUNNING,
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.common.data.job;
|
package org.thingsboard.server.common.data.job;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||||
@ -38,7 +39,6 @@ public abstract class Task {
|
|||||||
|
|
||||||
private TenantId tenantId;
|
private TenantId tenantId;
|
||||||
private JobId jobId;
|
private JobId jobId;
|
||||||
private String key;
|
|
||||||
private int retries;
|
private int retries;
|
||||||
|
|
||||||
public Task() {
|
public Task() {
|
||||||
@ -46,6 +46,11 @@ public abstract class Task {
|
|||||||
|
|
||||||
private int attempt = 0;
|
private int attempt = 0;
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
public abstract Object getKey();
|
||||||
|
|
||||||
|
public abstract TaskFailure toFailure(Throwable error);
|
||||||
|
|
||||||
public abstract JobType getJobType();
|
public abstract JobType getJobType();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,43 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2025 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.common.data.job;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.thingsboard.server.common.data.job.CfReprocessingTask.CfReprocessingTaskFailure;
|
||||||
|
import org.thingsboard.server.common.data.job.DummyTask.DummyTaskFailure;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType")
|
||||||
|
@JsonSubTypes({
|
||||||
|
@Type(name = "CF_REPROCESSING", value = CfReprocessingTaskFailure.class),
|
||||||
|
@Type(name = "DUMMY", value = DummyTaskFailure.class)
|
||||||
|
})
|
||||||
|
public abstract class TaskFailure {
|
||||||
|
|
||||||
|
private String error;
|
||||||
|
|
||||||
|
public abstract JobType getJobType();
|
||||||
|
|
||||||
|
}
|
||||||
@ -30,13 +30,4 @@ public class TaskResult {
|
|||||||
private boolean discarded;
|
private boolean discarded;
|
||||||
private TaskFailure failure;
|
private TaskFailure failure;
|
||||||
|
|
||||||
@Data
|
|
||||||
@AllArgsConstructor
|
|
||||||
@NoArgsConstructor
|
|
||||||
@Builder
|
|
||||||
public static class TaskFailure {
|
|
||||||
private String error;
|
|
||||||
private Task task;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.id.EntityId;
|
|||||||
import org.thingsboard.server.common.data.job.JobType;
|
import org.thingsboard.server.common.data.job.JobType;
|
||||||
import org.thingsboard.server.common.data.job.Task;
|
import org.thingsboard.server.common.data.job.Task;
|
||||||
import org.thingsboard.server.common.data.job.TaskResult;
|
import org.thingsboard.server.common.data.job.TaskResult;
|
||||||
import org.thingsboard.server.common.data.job.TaskResult.TaskFailure;
|
|
||||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
|
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
|
||||||
@ -147,10 +146,7 @@ public abstract class TaskProcessor<T extends Task> {
|
|||||||
|
|
||||||
private void reportFailure(Task task, Throwable error) {
|
private void reportFailure(Task task, Throwable error) {
|
||||||
TaskResult result = TaskResult.builder()
|
TaskResult result = TaskResult.builder()
|
||||||
.failure(TaskFailure.builder()
|
.failure(task.toFailure(error))
|
||||||
.error(error.getMessage())
|
|
||||||
.task(task)
|
|
||||||
.build())
|
|
||||||
.build();
|
.build();
|
||||||
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
|
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -30,12 +30,11 @@ import org.thingsboard.server.common.data.job.JobStats;
|
|||||||
import org.thingsboard.server.common.data.job.JobStatus;
|
import org.thingsboard.server.common.data.job.JobStatus;
|
||||||
import org.thingsboard.server.common.data.job.JobType;
|
import org.thingsboard.server.common.data.job.JobType;
|
||||||
import org.thingsboard.server.common.data.job.TaskResult;
|
import org.thingsboard.server.common.data.job.TaskResult;
|
||||||
import org.thingsboard.server.common.data.job.TaskResult.TaskFailure;
|
import org.thingsboard.server.common.data.job.TaskFailure;
|
||||||
import org.thingsboard.server.common.data.page.PageData;
|
import org.thingsboard.server.common.data.page.PageData;
|
||||||
import org.thingsboard.server.common.data.page.PageLink;
|
import org.thingsboard.server.common.data.page.PageLink;
|
||||||
import org.thingsboard.server.dao.entity.AbstractEntityService;
|
import org.thingsboard.server.dao.entity.AbstractEntityService;
|
||||||
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
|
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
|
||||||
import org.thingsboard.server.dao.service.DataValidator;
|
|
||||||
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
@ -52,12 +51,13 @@ import static org.thingsboard.server.common.data.job.JobStatus.RUNNING;
|
|||||||
public class DefaultJobService extends AbstractEntityService implements JobService {
|
public class DefaultJobService extends AbstractEntityService implements JobService {
|
||||||
|
|
||||||
private final JobDao jobDao;
|
private final JobDao jobDao;
|
||||||
private final JobValidator validator = new JobValidator();
|
|
||||||
|
|
||||||
@Transactional
|
@Transactional
|
||||||
@Override
|
@Override
|
||||||
public Job createJob(TenantId tenantId, Job job) {
|
public Job submitJob(TenantId tenantId, Job job) {
|
||||||
validator.validate(job, Job::getTenantId);
|
if (jobDao.existsByKeyAndStatusOneOf(job.getKey(), QUEUED, PENDING, RUNNING)) {
|
||||||
|
throw new IllegalArgumentException("The same job is already queued or running");
|
||||||
|
}
|
||||||
if (jobDao.existsByTenantIdAndTypeAndStatusOneOf(tenantId, job.getType(), PENDING, RUNNING)) {
|
if (jobDao.existsByTenantIdAndTypeAndStatusOneOf(tenantId, job.getType(), PENDING, RUNNING)) {
|
||||||
job.setStatus(QUEUED);
|
job.setStatus(QUEUED);
|
||||||
} else {
|
} else {
|
||||||
@ -124,10 +124,9 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
|
|||||||
result.setDiscardedCount(result.getDiscardedCount() + 1);
|
result.setDiscardedCount(result.getDiscardedCount() + 1);
|
||||||
} else {
|
} else {
|
||||||
TaskFailure failure = taskResult.getFailure();
|
TaskFailure failure = taskResult.getFailure();
|
||||||
String key = failure.getTask().getKey();
|
|
||||||
result.setFailedCount(result.getFailedCount() + 1);
|
result.setFailedCount(result.getFailedCount() + 1);
|
||||||
if (result.getFailures().size() < 1000) { // preserving only first 1000 errors, not reprocessing if there are more failures
|
if (result.getFailures().size() < 1000) { // preserving only first 1000 errors, not reprocessing if there are more failures
|
||||||
result.getFailures().put(key, failure.getError());
|
result.getFailures().add(failure);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,24 +191,6 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
|
|||||||
return jobDao.findByIdForUpdate(tenantId, jobId);
|
return jobDao.findByIdForUpdate(tenantId, jobId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: reprocessing
|
|
||||||
|
|
||||||
public class JobValidator extends DataValidator<Job> {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void validateCreate(TenantId tenantId, Job job) {
|
|
||||||
// if (jobDao.existsByTenantIdAndTypeAndStatusOneOf(tenantId, job.getType(), PENDING, RUNNING)) {
|
|
||||||
// throw new DataValidationException("Job of this type is already running");
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Job validateUpdate(TenantId tenantId, Job job) {
|
|
||||||
throw new IllegalArgumentException("Job can't be updated externally");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<HasId<?>> findEntity(TenantId tenantId, EntityId entityId) {
|
public Optional<HasId<?>> findEntity(TenantId tenantId, EntityId entityId) {
|
||||||
return Optional.ofNullable(findJobById(tenantId, (JobId) entityId));
|
return Optional.ofNullable(findJobById(tenantId, (JobId) entityId));
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user