Refactor job and task results

This commit is contained in:
ViacheslavKlimov 2025-05-01 13:41:36 +03:00
parent 4057e6b42e
commit 9e878923e1
16 changed files with 169 additions and 109 deletions

View File

@ -31,9 +31,8 @@ import org.thingsboard.server.common.data.job.JobResult;
import org.thingsboard.server.common.data.job.JobStats; import org.thingsboard.server.common.data.job.JobStats;
import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobStatus;
import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.Task; import org.thingsboard.server.common.data.job.task.Task;
import org.thingsboard.server.common.data.job.TaskFailure; import org.thingsboard.server.common.data.job.task.TaskResult;
import org.thingsboard.server.common.data.job.TaskResult;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
@ -50,7 +49,6 @@ import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -118,7 +116,7 @@ public class DefaultJobManager implements JobManager {
JobId jobId = job.getId(); JobId jobId = job.getId();
try { try {
JobProcessor processor = jobProcessors.get(job.getType()); JobProcessor processor = jobProcessors.get(job.getType());
List<TaskFailure> toReprocess = job.getConfiguration().getToReprocess(); List<TaskResult> toReprocess = job.getConfiguration().getToReprocess();
if (toReprocess == null) { if (toReprocess == null) {
int tasksCount = processor.process(job, this::submitTask); // todo: think about stopping tb - while tasks are being submitted int tasksCount = processor.process(job, this::submitTask); // todo: think about stopping tb - while tasks are being submitted
log.info("[{}][{}][{}] Submitted {} tasks", tenantId, jobId, job.getType(), tasksCount); log.info("[{}][{}][{}] Submitted {} tasks", tenantId, jobId, job.getType(), tasksCount);
@ -155,20 +153,24 @@ public class DefaultJobManager implements JobManager {
if (result.getGeneralError() != null) { if (result.getGeneralError() != null) {
throw new IllegalArgumentException("Reprocessing not allowed since job has general error"); throw new IllegalArgumentException("Reprocessing not allowed since job has general error");
} }
List<TaskFailure> failures = result.getFailures(); List<TaskResult> taskFailures = result.getResults().stream()
if (result.getFailedCount() > failures.size()) { .filter(taskResult -> !taskResult.isSuccess() && !taskResult.isDiscarded())
throw new IllegalArgumentException("Reprocessing not allowed since there are too many failures (more than " + failures.size() + ")"); .toList();
if (result.getFailedCount() > taskFailures.size()) {
throw new IllegalArgumentException("Reprocessing not allowed since there are too many failures (more than " + taskFailures.size() + ")");
} }
result.setFailedCount(0); result.setFailedCount(0);
result.setFailures(Collections.emptyList()); result.setResults(result.getResults().stream()
.filter(TaskResult::isSuccess)
.toList());
job.getConfiguration().setToReprocess(failures); job.getConfiguration().setToReprocess(taskFailures);
jobService.submitJob(tenantId, job); jobService.submitJob(tenantId, job);
} }
private void submitTask(Task task) { private void submitTask(Task<?> task) {
log.info("[{}][{}] Submitting task: {}", task.getTenantId(), task.getJobId(), task); log.info("[{}][{}] Submitting task: {}", task.getTenantId(), task.getJobId(), task);
TaskProto taskProto = TaskProto.newBuilder() TaskProto taskProto = TaskProto.newBuilder()
.setValue(JacksonUtil.toString(task)) .setValue(JacksonUtil.toString(task))

View File

@ -18,12 +18,13 @@ package org.thingsboard.server.service.job;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.job.DummyJobConfiguration; import org.thingsboard.server.common.data.job.DummyJobConfiguration;
import org.thingsboard.server.common.data.job.DummyTask;
import org.thingsboard.server.common.data.job.DummyTask.DummyTaskFailure;
import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.Task; import org.thingsboard.server.common.data.job.task.DummyTask;
import org.thingsboard.server.common.data.job.TaskFailure; import org.thingsboard.server.common.data.job.task.DummyTask.DummyTaskFailure;
import org.thingsboard.server.common.data.job.task.DummyTaskResult;
import org.thingsboard.server.common.data.job.task.Task;
import org.thingsboard.server.common.data.job.task.TaskResult;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -34,7 +35,7 @@ import java.util.function.Consumer;
public class DummyJobProcessor implements JobProcessor { public class DummyJobProcessor implements JobProcessor {
@Override @Override
public int process(Job job, Consumer<Task> taskConsumer) throws Exception { public int process(Job job, Consumer<Task<?>> taskConsumer) throws Exception {
DummyJobConfiguration configuration = job.getConfiguration(); DummyJobConfiguration configuration = job.getConfiguration();
if (configuration.getGeneralError() != null) { if (configuration.getGeneralError() != null) {
for (int number = 1; number <= configuration.getSubmittedTasksBeforeGeneralError(); number++) { for (int number = 1; number <= configuration.getSubmittedTasksBeforeGeneralError(); number++) {
@ -63,15 +64,15 @@ public class DummyJobProcessor implements JobProcessor {
} }
@Override @Override
public void reprocess(Job job, List<TaskFailure> failures, Consumer<Task> taskConsumer) throws Exception { public void reprocess(Job job, List<TaskResult> taskFailures, Consumer<Task<?>> taskConsumer) throws Exception {
for (TaskFailure failure : failures) { for (TaskResult taskFailure : taskFailures) {
DummyTaskFailure taskFailure = (DummyTaskFailure) failure; DummyTaskFailure failure = ((DummyTaskResult) taskFailure).getFailure();
taskConsumer.accept(createTask(job, job.getConfiguration(), taskFailure.getNumber(), taskFailure.isFailAlways() ? taskConsumer.accept(createTask(job, job.getConfiguration(), failure.getNumber(), failure.isFailAlways() ?
List.of(taskFailure.getError()) : Collections.emptyList(), taskFailure.isFailAlways())); List.of(failure.getError()) : Collections.emptyList(), failure.isFailAlways()));
} }
} }
private Task createTask(Job job, DummyJobConfiguration configuration, int number, List<String> errors, boolean failAlways) { private DummyTask createTask(Job job, DummyJobConfiguration configuration, int number, List<String> errors, boolean failAlways) {
return DummyTask.builder() return DummyTask.builder()
.tenantId(job.getTenantId()) .tenantId(job.getTenantId())
.jobId(job.getId()) .jobId(job.getId())

View File

@ -17,17 +17,17 @@ package org.thingsboard.server.service.job;
import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.Task; import org.thingsboard.server.common.data.job.task.Task;
import org.thingsboard.server.common.data.job.TaskFailure; import org.thingsboard.server.common.data.job.task.TaskResult;
import java.util.List; import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
public interface JobProcessor { public interface JobProcessor {
int process(Job job, Consumer<Task> taskConsumer) throws Exception; int process(Job job, Consumer<Task<?>> taskConsumer) throws Exception;
void reprocess(Job job, List<TaskFailure> failures, Consumer<Task> taskConsumer) throws Exception; void reprocess(Job job, List<TaskResult> taskFailures, Consumer<Task<?>> taskConsumer) throws Exception;
JobType getType(); JobType getType();

View File

@ -17,16 +17,17 @@ package org.thingsboard.server.service.job.task;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.job.DummyTask; import org.thingsboard.server.common.data.job.task.DummyTask;
import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.task.DummyTaskResult;
import org.thingsboard.server.queue.task.TaskProcessor; import org.thingsboard.server.queue.task.TaskProcessor;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
public class DummyTaskProcessor extends TaskProcessor<DummyTask, Void> { public class DummyTaskProcessor extends TaskProcessor<DummyTask, DummyTaskResult> {
@Override @Override
public Void process(DummyTask task) throws Exception { public DummyTaskResult process(DummyTask task) throws Exception {
if (task.getProcessingTimeMs() > 0) { if (task.getProcessingTimeMs() > 0) {
Thread.sleep(task.getProcessingTimeMs()); Thread.sleep(task.getProcessingTimeMs());
} }
@ -37,7 +38,7 @@ public class DummyTaskProcessor extends TaskProcessor<DummyTask, Void> {
String error = task.getErrors().get(task.getAttempt() - 1); String error = task.getErrors().get(task.getAttempt() - 1);
throw new RuntimeException(error); throw new RuntimeException(error);
} }
return null; return DummyTaskResult.success();
} }
@Override @Override

View File

@ -26,11 +26,12 @@ import org.springframework.test.context.TestPropertySource;
import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.DummyJobConfiguration; import org.thingsboard.server.common.data.job.DummyJobConfiguration;
import org.thingsboard.server.common.data.job.DummyTask.DummyTaskFailure;
import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobResult; import org.thingsboard.server.common.data.job.JobResult;
import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobStatus;
import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.task.DummyTask.DummyTaskFailure;
import org.thingsboard.server.common.data.job.task.DummyTaskResult;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.controller.AbstractControllerTest; import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.dao.job.JobService;
@ -101,7 +102,7 @@ public class JobManagerTest extends AbstractControllerTest {
Job job = findJobById(jobId); Job job = findJobById(jobId);
assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED); assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED);
assertThat(job.getResult().getSuccessfulCount()).isEqualTo(tasksCount); assertThat(job.getResult().getSuccessfulCount()).isEqualTo(tasksCount);
assertThat(job.getResult().getFailures()).isEmpty(); assertThat(job.getResult().getResults()).isEmpty();
assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount); assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount);
}); });
} }
@ -131,8 +132,8 @@ public class JobManagerTest extends AbstractControllerTest {
assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks); assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks);
assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks); assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks);
assertThat(jobResult.getTotalCount()).isEqualTo(successfulTasks + failedTasks); assertThat(jobResult.getTotalCount()).isEqualTo(successfulTasks + failedTasks);
assertThat(jobResult.getFailures().get(0).getError()).isEqualTo("error3"); // last error assertThat(((DummyTaskResult) jobResult.getResults().get(0)).getFailure().getError()).isEqualTo("error3"); // last error
assertThat(jobResult.getFailures().get(1).getError()).isEqualTo("error3"); // last error assertThat(((DummyTaskResult) jobResult.getResults().get(1)).getFailure().getError()).isEqualTo("error3"); // last error
assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount()); assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount());
}); });
} }
@ -353,7 +354,7 @@ public class JobManagerTest extends AbstractControllerTest {
assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks); assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks);
for (int i = 0, taskNumber = successfulTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) { for (int i = 0, taskNumber = successfulTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) {
DummyTaskFailure failure = (DummyTaskFailure) jobResult.getFailures().get(i); DummyTaskFailure failure = ((DummyTaskResult) jobResult.getResults().get(i)).getFailure();
assertThat(failure.getNumber()).isEqualTo(taskNumber); assertThat(failure.getNumber()).isEqualTo(taskNumber);
assertThat(failure.getError()).isEqualTo("error"); assertThat(failure.getError()).isEqualTo("error");
} }
@ -367,7 +368,7 @@ public class JobManagerTest extends AbstractControllerTest {
assertThat(job.getResult().getSuccessfulCount()).isEqualTo(totalTasksCount); assertThat(job.getResult().getSuccessfulCount()).isEqualTo(totalTasksCount);
assertThat(job.getResult().getFailedCount()).isZero(); assertThat(job.getResult().getFailedCount()).isZero();
assertThat(job.getResult().getTotalCount()).isEqualTo(totalTasksCount); assertThat(job.getResult().getTotalCount()).isEqualTo(totalTasksCount);
assertThat(job.getResult().getFailures()).isEmpty(); assertThat(job.getResult().getResults()).isEmpty();
}); });
} }
@ -400,7 +401,7 @@ public class JobManagerTest extends AbstractControllerTest {
assertThat(jobResult.getTotalCount()).isEqualTo(totalTasksCount); assertThat(jobResult.getTotalCount()).isEqualTo(totalTasksCount);
for (int i = 0, taskNumber = successfulTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) { for (int i = 0, taskNumber = successfulTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) {
DummyTaskFailure failure = (DummyTaskFailure) jobResult.getFailures().get(i); DummyTaskFailure failure = ((DummyTaskResult) jobResult.getResults().get(i)).getFailure();
assertThat(failure.getNumber()).isEqualTo(taskNumber); assertThat(failure.getNumber()).isEqualTo(taskNumber);
assertThat(failure.getError()).isEqualTo("error"); assertThat(failure.getError()).isEqualTo("error");
} }
@ -417,7 +418,7 @@ public class JobManagerTest extends AbstractControllerTest {
assertThat(jobResult.getTotalCount()).isEqualTo(totalTasksCount); assertThat(jobResult.getTotalCount()).isEqualTo(totalTasksCount);
for (int i = 0, taskNumber = successfulTasks + failedTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) { for (int i = 0, taskNumber = successfulTasks + failedTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) {
DummyTaskFailure failure = (DummyTaskFailure) jobResult.getFailures().get(i); DummyTaskFailure failure = ((DummyTaskResult) jobResult.getResults().get(i)).getFailure();
assertThat(failure.getNumber()).isEqualTo(taskNumber); assertThat(failure.getNumber()).isEqualTo(taskNumber);
assertThat(failure.getError()).isEqualTo("error"); assertThat(failure.getError()).isEqualTo("error");
assertThat(failure.isFailAlways()).isTrue(); assertThat(failure.isFailAlways()).isTrue();

View File

@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data; import lombok.Data;
import org.thingsboard.server.common.data.job.task.TaskResult;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
@ -32,7 +33,7 @@ import java.util.List;
@Data @Data
public abstract class JobConfiguration implements Serializable { public abstract class JobConfiguration implements Serializable {
private List<TaskFailure> toReprocess; private List<TaskResult> toReprocess;
public abstract JobType getType(); public abstract JobType getType();

View File

@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.job.task.TaskResult;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
@ -40,7 +41,7 @@ public abstract class JobResult implements Serializable {
private int failedCount; private int failedCount;
private int discardedCount; private int discardedCount;
private Integer totalCount = null; // set when all tasks are submitted private Integer totalCount = null; // set when all tasks are submitted
private List<TaskFailure> failures = new ArrayList<>(); private List<TaskResult> results = new ArrayList<>();
private String generalError; private String generalError;
private long cancellationTs; private long cancellationTs;
@ -50,6 +51,19 @@ public abstract class JobResult implements Serializable {
return successfulCount + failedCount + discardedCount; return successfulCount + failedCount + discardedCount;
} }
public void processTaskResult(TaskResult taskResult) {
if (taskResult.isSuccess()) {
successfulCount++;
} else if (taskResult.isDiscarded()) {
discardedCount++;
} else {
failedCount++;
if (results.size() < 1000) { // preserving only first 1000 errors, not reprocessing if there are more failures
results.add(taskResult);
}
}
}
public abstract JobType getJobType(); public abstract JobType getJobType();
} }

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.job;
import lombok.Data; import lombok.Data;
import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.task.TaskResult;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;

View File

@ -13,22 +13,24 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.common.data.job; package org.thingsboard.server.common.data.job.task;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import org.thingsboard.server.common.data.job.JobType;
import java.util.List; import java.util.List;
import java.util.Optional;
@Data @Data
@NoArgsConstructor @NoArgsConstructor
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@SuperBuilder @SuperBuilder
@ToString(callSuper = true) @ToString(callSuper = true)
public class DummyTask extends Task { public class DummyTask extends Task<DummyTaskResult> {
private int number; private int number;
private long processingTimeMs; private long processingTimeMs;
@ -41,8 +43,17 @@ public class DummyTask extends Task {
} }
@Override @Override
public TaskFailure toFailure(Throwable error) { public DummyTaskResult toResult(boolean discarded, Optional<Throwable> error) {
return new DummyTaskFailure(number, failAlways, error.getMessage()); var result = DummyTaskResult.builder();
result.discarded(discarded);
if (error.isPresent()) {
result.failure(DummyTaskFailure.builder()
.error(error.map(Throwable::getMessage).orElse(null))
.number(number)
.failAlways(failAlways)
.build());
}
return result.build();
} }
@Override @Override
@ -51,24 +62,14 @@ public class DummyTask extends Task {
} }
@Data @Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor @NoArgsConstructor
public static class DummyTaskFailure extends TaskFailure { @EqualsAndHashCode(callSuper = true)
@SuperBuilder
public static class DummyTaskFailure extends TaskFailure { // todo: do we need separate structure?
private int number; private int number;
private boolean failAlways; private boolean failAlways;
public DummyTaskFailure(int number, boolean failAlways, String error) {
super(error);
this.number = number;
this.failAlways = failAlways;
}
@Override
public JobType getJobType() {
return JobType.DUMMY;
}
} }
} }

View File

@ -0,0 +1,48 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.job.task;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.task.DummyTask.DummyTaskFailure;
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@SuperBuilder
public class DummyTaskResult extends TaskResult {
private static final DummyTaskResult SUCCESS = new DummyTaskResult(true);
private DummyTaskFailure failure;
public DummyTaskResult(boolean success) {
super(success);
}
public static DummyTaskResult success() {
return SUCCESS;
}
@Override
public JobType getJobType() {
return JobType.DUMMY;
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.common.data.job; package org.thingsboard.server.common.data.job.task;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@ -25,6 +25,9 @@ import lombok.Data;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.JobType;
import java.util.Optional;
@Data @Data
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
@ -34,7 +37,7 @@ import org.thingsboard.server.common.data.id.TenantId;
}) })
@SuperBuilder @SuperBuilder
@AllArgsConstructor @AllArgsConstructor
public abstract class Task { public abstract class Task<R extends TaskResult> {
private TenantId tenantId; private TenantId tenantId;
private JobId jobId; private JobId jobId;
@ -48,7 +51,7 @@ public abstract class Task {
@JsonIgnore @JsonIgnore
public abstract Object getKey(); public abstract Object getKey();
public abstract TaskFailure toFailure(Throwable error); public abstract R toResult(boolean discarded, Optional<Throwable> error);
public abstract JobType getJobType(); public abstract JobType getJobType();

View File

@ -13,21 +13,19 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.common.data.job; package org.thingsboard.server.common.data.job.task;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data @Data
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
@Builder @SuperBuilder
public class TaskResult { public abstract class TaskFailure {
private boolean success; private String error;
private boolean discarded;
private TaskFailure failure;
} }

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.common.data.job; package org.thingsboard.server.common.data.job.task;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
@ -22,19 +22,26 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.job.DummyTask.DummyTaskFailure; import lombok.experimental.SuperBuilder;
import org.thingsboard.server.common.data.job.JobType;
@Data @Data
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
@SuperBuilder
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType")
@JsonSubTypes({ @JsonSubTypes({
@Type(name = "DUMMY", value = DummyTaskFailure.class) @Type(name = "DUMMY", value = DummyTaskResult.class)
}) })
public abstract class TaskFailure { public abstract class TaskResult {
private String error; private boolean success;
private boolean discarded;
public TaskResult(boolean success) {
this.success = success;
}
public abstract JobType getJobType(); public abstract JobType getJobType();

View File

@ -22,7 +22,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.TaskResult; import org.thingsboard.server.common.data.job.task.TaskResult;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TaskResultProto; import org.thingsboard.server.gen.transport.TransportProtos.TaskResultProto;

View File

@ -25,8 +25,8 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.Task; import org.thingsboard.server.common.data.job.task.Task;
import org.thingsboard.server.common.data.job.TaskResult; import org.thingsboard.server.common.data.job.task.TaskResult;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
@ -37,13 +37,14 @@ import org.thingsboard.server.queue.provider.TaskProcessorQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.AfterStartUp;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
public abstract class TaskProcessor<T extends Task, R> { public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
protected final Logger log = LoggerFactory.getLogger(getClass()); protected final Logger log = LoggerFactory.getLogger(getClass());
@ -98,16 +99,17 @@ public abstract class TaskProcessor<T extends Task, R> {
private void processMsgs(List<TbProtoQueueMsg<TaskProto>> msgs, TbQueueConsumer<TbProtoQueueMsg<TaskProto>> consumer) throws Exception { private void processMsgs(List<TbProtoQueueMsg<TaskProto>> msgs, TbQueueConsumer<TbProtoQueueMsg<TaskProto>> consumer) throws Exception {
for (TbProtoQueueMsg<TaskProto> msg : msgs) { for (TbProtoQueueMsg<TaskProto> msg : msgs) {
try { try {
Task task = JacksonUtil.fromString(msg.getValue().getValue(), Task.class); @SuppressWarnings("unchecked")
T task = (T) JacksonUtil.fromString(msg.getValue().getValue(), Task.class);
if (discardedJobs.contains(task.getJobId().getId())) { if (discardedJobs.contains(task.getJobId().getId())) {
log.info("Skipping task '{}' for cancelled job {}", task.getKey(), task.getJobId()); log.info("Skipping task '{}' for cancelled job {}", task.getKey(), task.getJobId());
reportCancelled(task); reportTaskDiscarded(task);
continue; continue;
} else if (deletedTenants.contains(task.getTenantId().getId())) { } else if (deletedTenants.contains(task.getTenantId().getId())) {
log.info("Skipping task '{}' for deleted tenant {}", task.getKey(), task.getTenantId()); log.info("Skipping task '{}' for deleted tenant {}", task.getKey(), task.getTenantId());
continue; continue;
} }
processTask((T) task); processTask(task);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
@ -121,8 +123,8 @@ public abstract class TaskProcessor<T extends Task, R> {
task.setAttempt(task.getAttempt() + 1); task.setAttempt(task.getAttempt() + 1);
log.info("Processing task: {}", task); log.info("Processing task: {}", task);
try { try {
process(task); R result = process(task);
reportSuccess(task); reportTaskResult(task, result);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
@ -130,31 +132,22 @@ public abstract class TaskProcessor<T extends Task, R> {
if (task.getAttempt() <= task.getRetries()) { if (task.getAttempt() <= task.getRetries()) {
processTask(task); processTask(task);
} else { } else {
reportFailure(task, e); reportTaskFailure(task, e);
} }
} }
} }
public abstract R process(T task) throws Exception; public abstract R process(T task) throws Exception;
private void reportSuccess(Task task) { private void reportTaskFailure(T task, Throwable error) {
TaskResult result = TaskResult.builder() reportTaskResult(task, task.toResult(false, Optional.of(error)));
.success(true)
.build();
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
} }
private void reportFailure(Task task, Throwable error) { private void reportTaskDiscarded(T task) {
TaskResult result = TaskResult.builder() reportTaskResult(task, task.toResult(true, Optional.empty()));
.failure(task.toFailure(error))
.build();
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
} }
private void reportCancelled(Task task) { private void reportTaskResult(T task, R result) {
TaskResult result = TaskResult.builder()
.discarded(true)
.build();
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result); statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
} }

View File

@ -29,8 +29,7 @@ import org.thingsboard.server.common.data.job.JobResult;
import org.thingsboard.server.common.data.job.JobStats; import org.thingsboard.server.common.data.job.JobStats;
import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobStatus;
import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.TaskFailure; import org.thingsboard.server.common.data.job.task.TaskResult;
import org.thingsboard.server.common.data.job.TaskResult;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.entity.AbstractEntityService; import org.thingsboard.server.dao.entity.AbstractEntityService;
@ -118,17 +117,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
boolean publishEvent = false; boolean publishEvent = false;
for (TaskResult taskResult : jobStats.getTaskResults()) { for (TaskResult taskResult : jobStats.getTaskResults()) {
if (taskResult.isSuccess()) { result.processTaskResult(taskResult);
result.setSuccessfulCount(result.getSuccessfulCount() + 1);
} else if (taskResult.isDiscarded()) {
result.setDiscardedCount(result.getDiscardedCount() + 1);
} else {
TaskFailure failure = taskResult.getFailure();
result.setFailedCount(result.getFailedCount() + 1);
if (result.getFailures().size() < 1000) { // preserving only first 1000 errors, not reprocessing if there are more failures
result.getFailures().add(failure);
}
}
if (result.getCancellationTs() > 0) { if (result.getCancellationTs() > 0) {
if (!taskResult.isDiscarded() && System.currentTimeMillis() > result.getCancellationTs()) { if (!taskResult.isDiscarded() && System.currentTimeMillis() > result.getCancellationTs()) {