diff --git a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java index b72974144d..3d601a9358 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java @@ -31,9 +31,8 @@ import org.thingsboard.server.common.data.job.JobResult; import org.thingsboard.server.common.data.job.JobStats; import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; -import org.thingsboard.server.common.data.job.Task; -import org.thingsboard.server.common.data.job.TaskFailure; -import org.thingsboard.server.common.data.job.TaskResult; +import org.thingsboard.server.common.data.job.task.Task; +import org.thingsboard.server.common.data.job.task.TaskResult; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; @@ -50,7 +49,6 @@ import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbCoreComponent; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -118,7 +116,7 @@ public class DefaultJobManager implements JobManager { JobId jobId = job.getId(); try { JobProcessor processor = jobProcessors.get(job.getType()); - List toReprocess = job.getConfiguration().getToReprocess(); + List toReprocess = job.getConfiguration().getToReprocess(); if (toReprocess == null) { int tasksCount = processor.process(job, this::submitTask); // todo: think about stopping tb - while tasks are being submitted log.info("[{}][{}][{}] Submitted {} tasks", tenantId, jobId, job.getType(), tasksCount); @@ -155,20 +153,24 @@ public class DefaultJobManager implements JobManager { if (result.getGeneralError() != null) { throw new IllegalArgumentException("Reprocessing not allowed since job has general error"); } - List failures = result.getFailures(); - if (result.getFailedCount() > failures.size()) { - throw new IllegalArgumentException("Reprocessing not allowed since there are too many failures (more than " + failures.size() + ")"); + List taskFailures = result.getResults().stream() + .filter(taskResult -> !taskResult.isSuccess() && !taskResult.isDiscarded()) + .toList(); + if (result.getFailedCount() > taskFailures.size()) { + throw new IllegalArgumentException("Reprocessing not allowed since there are too many failures (more than " + taskFailures.size() + ")"); } result.setFailedCount(0); - result.setFailures(Collections.emptyList()); + result.setResults(result.getResults().stream() + .filter(TaskResult::isSuccess) + .toList()); - job.getConfiguration().setToReprocess(failures); + job.getConfiguration().setToReprocess(taskFailures); jobService.submitJob(tenantId, job); } - private void submitTask(Task task) { + private void submitTask(Task task) { log.info("[{}][{}] Submitting task: {}", task.getTenantId(), task.getJobId(), task); TaskProto taskProto = TaskProto.newBuilder() .setValue(JacksonUtil.toString(task)) diff --git a/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java index 900b876c09..64148af259 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java @@ -18,12 +18,13 @@ package org.thingsboard.server.service.job; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.job.DummyJobConfiguration; -import org.thingsboard.server.common.data.job.DummyTask; -import org.thingsboard.server.common.data.job.DummyTask.DummyTaskFailure; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobType; -import org.thingsboard.server.common.data.job.Task; -import org.thingsboard.server.common.data.job.TaskFailure; +import org.thingsboard.server.common.data.job.task.DummyTask; +import org.thingsboard.server.common.data.job.task.DummyTask.DummyTaskFailure; +import org.thingsboard.server.common.data.job.task.DummyTaskResult; +import org.thingsboard.server.common.data.job.task.Task; +import org.thingsboard.server.common.data.job.task.TaskResult; import java.util.Collections; import java.util.List; @@ -34,7 +35,7 @@ import java.util.function.Consumer; public class DummyJobProcessor implements JobProcessor { @Override - public int process(Job job, Consumer taskConsumer) throws Exception { + public int process(Job job, Consumer> taskConsumer) throws Exception { DummyJobConfiguration configuration = job.getConfiguration(); if (configuration.getGeneralError() != null) { for (int number = 1; number <= configuration.getSubmittedTasksBeforeGeneralError(); number++) { @@ -63,15 +64,15 @@ public class DummyJobProcessor implements JobProcessor { } @Override - public void reprocess(Job job, List failures, Consumer taskConsumer) throws Exception { - for (TaskFailure failure : failures) { - DummyTaskFailure taskFailure = (DummyTaskFailure) failure; - taskConsumer.accept(createTask(job, job.getConfiguration(), taskFailure.getNumber(), taskFailure.isFailAlways() ? - List.of(taskFailure.getError()) : Collections.emptyList(), taskFailure.isFailAlways())); + public void reprocess(Job job, List taskFailures, Consumer> taskConsumer) throws Exception { + for (TaskResult taskFailure : taskFailures) { + DummyTaskFailure failure = ((DummyTaskResult) taskFailure).getFailure(); + taskConsumer.accept(createTask(job, job.getConfiguration(), failure.getNumber(), failure.isFailAlways() ? + List.of(failure.getError()) : Collections.emptyList(), failure.isFailAlways())); } } - private Task createTask(Job job, DummyJobConfiguration configuration, int number, List errors, boolean failAlways) { + private DummyTask createTask(Job job, DummyJobConfiguration configuration, int number, List errors, boolean failAlways) { return DummyTask.builder() .tenantId(job.getTenantId()) .jobId(job.getId()) diff --git a/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java index da2c75d166..301d4bf6eb 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java @@ -17,17 +17,17 @@ package org.thingsboard.server.service.job; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobType; -import org.thingsboard.server.common.data.job.Task; -import org.thingsboard.server.common.data.job.TaskFailure; +import org.thingsboard.server.common.data.job.task.Task; +import org.thingsboard.server.common.data.job.task.TaskResult; import java.util.List; import java.util.function.Consumer; public interface JobProcessor { - int process(Job job, Consumer taskConsumer) throws Exception; + int process(Job job, Consumer> taskConsumer) throws Exception; - void reprocess(Job job, List failures, Consumer taskConsumer) throws Exception; + void reprocess(Job job, List taskFailures, Consumer> taskConsumer) throws Exception; JobType getType(); diff --git a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java index 361dbfbde1..5178461746 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java @@ -17,16 +17,17 @@ package org.thingsboard.server.service.job.task; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; -import org.thingsboard.server.common.data.job.DummyTask; +import org.thingsboard.server.common.data.job.task.DummyTask; import org.thingsboard.server.common.data.job.JobType; +import org.thingsboard.server.common.data.job.task.DummyTaskResult; import org.thingsboard.server.queue.task.TaskProcessor; @Component @RequiredArgsConstructor -public class DummyTaskProcessor extends TaskProcessor { +public class DummyTaskProcessor extends TaskProcessor { @Override - public Void process(DummyTask task) throws Exception { + public DummyTaskResult process(DummyTask task) throws Exception { if (task.getProcessingTimeMs() > 0) { Thread.sleep(task.getProcessingTimeMs()); } @@ -37,7 +38,7 @@ public class DummyTaskProcessor extends TaskProcessor { String error = task.getErrors().get(task.getAttempt() - 1); throw new RuntimeException(error); } - return null; + return DummyTaskResult.success(); } @Override diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java index f9881d0248..d7d22a2859 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java @@ -26,11 +26,12 @@ import org.springframework.test.context.TestPropertySource; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.DummyJobConfiguration; -import org.thingsboard.server.common.data.job.DummyTask.DummyTaskFailure; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobResult; import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; +import org.thingsboard.server.common.data.job.task.DummyTask.DummyTaskFailure; +import org.thingsboard.server.common.data.job.task.DummyTaskResult; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.controller.AbstractControllerTest; import org.thingsboard.server.dao.job.JobService; @@ -101,7 +102,7 @@ public class JobManagerTest extends AbstractControllerTest { Job job = findJobById(jobId); assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED); assertThat(job.getResult().getSuccessfulCount()).isEqualTo(tasksCount); - assertThat(job.getResult().getFailures()).isEmpty(); + assertThat(job.getResult().getResults()).isEmpty(); assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount); }); } @@ -131,8 +132,8 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks); assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks); assertThat(jobResult.getTotalCount()).isEqualTo(successfulTasks + failedTasks); - assertThat(jobResult.getFailures().get(0).getError()).isEqualTo("error3"); // last error - assertThat(jobResult.getFailures().get(1).getError()).isEqualTo("error3"); // last error + assertThat(((DummyTaskResult) jobResult.getResults().get(0)).getFailure().getError()).isEqualTo("error3"); // last error + assertThat(((DummyTaskResult) jobResult.getResults().get(1)).getFailure().getError()).isEqualTo("error3"); // last error assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount()); }); } @@ -353,7 +354,7 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks); for (int i = 0, taskNumber = successfulTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) { - DummyTaskFailure failure = (DummyTaskFailure) jobResult.getFailures().get(i); + DummyTaskFailure failure = ((DummyTaskResult) jobResult.getResults().get(i)).getFailure(); assertThat(failure.getNumber()).isEqualTo(taskNumber); assertThat(failure.getError()).isEqualTo("error"); } @@ -367,7 +368,7 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(job.getResult().getSuccessfulCount()).isEqualTo(totalTasksCount); assertThat(job.getResult().getFailedCount()).isZero(); assertThat(job.getResult().getTotalCount()).isEqualTo(totalTasksCount); - assertThat(job.getResult().getFailures()).isEmpty(); + assertThat(job.getResult().getResults()).isEmpty(); }); } @@ -400,7 +401,7 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(jobResult.getTotalCount()).isEqualTo(totalTasksCount); for (int i = 0, taskNumber = successfulTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) { - DummyTaskFailure failure = (DummyTaskFailure) jobResult.getFailures().get(i); + DummyTaskFailure failure = ((DummyTaskResult) jobResult.getResults().get(i)).getFailure(); assertThat(failure.getNumber()).isEqualTo(taskNumber); assertThat(failure.getError()).isEqualTo("error"); } @@ -417,7 +418,7 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(jobResult.getTotalCount()).isEqualTo(totalTasksCount); for (int i = 0, taskNumber = successfulTasks + failedTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) { - DummyTaskFailure failure = (DummyTaskFailure) jobResult.getFailures().get(i); + DummyTaskFailure failure = ((DummyTaskResult) jobResult.getResults().get(i)).getFailure(); assertThat(failure.getNumber()).isEqualTo(taskNumber); assertThat(failure.getError()).isEqualTo("error"); assertThat(failure.isFailAlways()).isTrue(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java index 7a2eccd42a..8aed4adbe5 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; import lombok.Data; +import org.thingsboard.server.common.data.job.task.TaskResult; import java.io.Serializable; import java.util.List; @@ -32,7 +33,7 @@ import java.util.List; @Data public abstract class JobConfiguration implements Serializable { - private List toReprocess; + private List toReprocess; public abstract JobType getType(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java index bf5e5f2c56..4e4787bbe5 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; import lombok.Data; import lombok.NoArgsConstructor; +import org.thingsboard.server.common.data.job.task.TaskResult; import java.io.Serializable; import java.util.ArrayList; @@ -40,7 +41,7 @@ public abstract class JobResult implements Serializable { private int failedCount; private int discardedCount; private Integer totalCount = null; // set when all tasks are submitted - private List failures = new ArrayList<>(); + private List results = new ArrayList<>(); private String generalError; private long cancellationTs; @@ -50,6 +51,19 @@ public abstract class JobResult implements Serializable { return successfulCount + failedCount + discardedCount; } + public void processTaskResult(TaskResult taskResult) { + if (taskResult.isSuccess()) { + successfulCount++; + } else if (taskResult.isDiscarded()) { + discardedCount++; + } else { + failedCount++; + if (results.size() < 1000) { // preserving only first 1000 errors, not reprocessing if there are more failures + results.add(taskResult); + } + } + } + public abstract JobType getJobType(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStats.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStats.java index 9d2c3d9be2..dc3e265f2d 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStats.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStats.java @@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.job; import lombok.Data; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.job.task.TaskResult; import java.util.ArrayList; import java.util.List; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTask.java similarity index 63% rename from common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java rename to common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTask.java index ac15dc63cc..8c3ed31e50 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTask.java @@ -13,22 +13,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.data.job; +package org.thingsboard.server.common.data.job.task; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import lombok.ToString; import lombok.experimental.SuperBuilder; +import org.thingsboard.server.common.data.job.JobType; import java.util.List; +import java.util.Optional; @Data @NoArgsConstructor @EqualsAndHashCode(callSuper = true) @SuperBuilder @ToString(callSuper = true) -public class DummyTask extends Task { +public class DummyTask extends Task { private int number; private long processingTimeMs; @@ -41,8 +43,17 @@ public class DummyTask extends Task { } @Override - public TaskFailure toFailure(Throwable error) { - return new DummyTaskFailure(number, failAlways, error.getMessage()); + public DummyTaskResult toResult(boolean discarded, Optional error) { + var result = DummyTaskResult.builder(); + result.discarded(discarded); + if (error.isPresent()) { + result.failure(DummyTaskFailure.builder() + .error(error.map(Throwable::getMessage).orElse(null)) + .number(number) + .failAlways(failAlways) + .build()); + } + return result.build(); } @Override @@ -51,24 +62,14 @@ public class DummyTask extends Task { } @Data - @EqualsAndHashCode(callSuper = true) @NoArgsConstructor - public static class DummyTaskFailure extends TaskFailure { + @EqualsAndHashCode(callSuper = true) + @SuperBuilder + public static class DummyTaskFailure extends TaskFailure { // todo: do we need separate structure? private int number; private boolean failAlways; - public DummyTaskFailure(int number, boolean failAlways, String error) { - super(error); - this.number = number; - this.failAlways = failAlways; - } - - @Override - public JobType getJobType() { - return JobType.DUMMY; - } - } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java new file mode 100644 index 0000000000..7c36636181 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java @@ -0,0 +1,48 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.job.task; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import org.thingsboard.server.common.data.job.JobType; +import org.thingsboard.server.common.data.job.task.DummyTask.DummyTaskFailure; + +@Data +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +@SuperBuilder +public class DummyTaskResult extends TaskResult { + + private static final DummyTaskResult SUCCESS = new DummyTaskResult(true); + + private DummyTaskFailure failure; + + public DummyTaskResult(boolean success) { + super(success); + } + + public static DummyTaskResult success() { + return SUCCESS; + } + + @Override + public JobType getJobType() { + return JobType.DUMMY; + } + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/Task.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/Task.java similarity index 85% rename from common/data/src/main/java/org/thingsboard/server/common/data/job/Task.java rename to common/data/src/main/java/org/thingsboard/server/common/data/job/task/Task.java index ad7c6b62df..5cf39ec6fb 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/Task.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/Task.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.data.job; +package org.thingsboard.server.common.data.job.task; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @@ -25,6 +25,9 @@ import lombok.Data; import lombok.experimental.SuperBuilder; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.job.JobType; + +import java.util.Optional; @Data @JsonIgnoreProperties(ignoreUnknown = true) @@ -34,7 +37,7 @@ import org.thingsboard.server.common.data.id.TenantId; }) @SuperBuilder @AllArgsConstructor -public abstract class Task { +public abstract class Task { private TenantId tenantId; private JobId jobId; @@ -48,7 +51,7 @@ public abstract class Task { @JsonIgnore public abstract Object getKey(); - public abstract TaskFailure toFailure(Throwable error); + public abstract R toResult(boolean discarded, Optional error); public abstract JobType getJobType(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskFailure.java similarity index 79% rename from common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java rename to common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskFailure.java index 0ee9a2b477..ce22a08269 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskFailure.java @@ -13,21 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.data.job; +package org.thingsboard.server.common.data.job.task; import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; @Data @AllArgsConstructor @NoArgsConstructor -@Builder -public class TaskResult { +@SuperBuilder +public abstract class TaskFailure { - private boolean success; - private boolean discarded; - private TaskFailure failure; + private String error; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskFailure.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java similarity index 74% rename from common/data/src/main/java/org/thingsboard/server/common/data/job/TaskFailure.java rename to common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java index 7a04db8188..40f0763e3a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskFailure.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.data.job; +package org.thingsboard.server.common.data.job.task; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonSubTypes; @@ -22,19 +22,26 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import org.thingsboard.server.common.data.job.DummyTask.DummyTaskFailure; +import lombok.experimental.SuperBuilder; +import org.thingsboard.server.common.data.job.JobType; @Data @AllArgsConstructor @NoArgsConstructor +@SuperBuilder @JsonIgnoreProperties(ignoreUnknown = true) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType") @JsonSubTypes({ - @Type(name = "DUMMY", value = DummyTaskFailure.class) + @Type(name = "DUMMY", value = DummyTaskResult.class) }) -public abstract class TaskFailure { +public abstract class TaskResult { - private String error; + private boolean success; + private boolean discarded; + + public TaskResult(boolean success) { + this.success = success; + } public abstract JobType getJobType(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java index 8d69f5781b..c3780f15e7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java @@ -22,7 +22,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.job.TaskResult; +import org.thingsboard.server.common.data.job.task.TaskResult; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; import org.thingsboard.server.gen.transport.TransportProtos.TaskResultProto; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index 57dd46a6e5..25e9d8807d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -25,8 +25,8 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.job.JobType; -import org.thingsboard.server.common.data.job.Task; -import org.thingsboard.server.common.data.job.TaskResult; +import org.thingsboard.server.common.data.job.task.Task; +import org.thingsboard.server.common.data.job.task.TaskResult; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; @@ -37,13 +37,14 @@ import org.thingsboard.server.queue.provider.TaskProcessorQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -public abstract class TaskProcessor { +public abstract class TaskProcessor, R extends TaskResult> { protected final Logger log = LoggerFactory.getLogger(getClass()); @@ -98,16 +99,17 @@ public abstract class TaskProcessor { private void processMsgs(List> msgs, TbQueueConsumer> consumer) throws Exception { for (TbProtoQueueMsg msg : msgs) { try { - Task task = JacksonUtil.fromString(msg.getValue().getValue(), Task.class); + @SuppressWarnings("unchecked") + T task = (T) JacksonUtil.fromString(msg.getValue().getValue(), Task.class); if (discardedJobs.contains(task.getJobId().getId())) { log.info("Skipping task '{}' for cancelled job {}", task.getKey(), task.getJobId()); - reportCancelled(task); + reportTaskDiscarded(task); continue; } else if (deletedTenants.contains(task.getTenantId().getId())) { log.info("Skipping task '{}' for deleted tenant {}", task.getKey(), task.getTenantId()); continue; } - processTask((T) task); + processTask(task); } catch (InterruptedException e) { throw e; } catch (Exception e) { @@ -121,8 +123,8 @@ public abstract class TaskProcessor { task.setAttempt(task.getAttempt() + 1); log.info("Processing task: {}", task); try { - process(task); - reportSuccess(task); + R result = process(task); + reportTaskResult(task, result); } catch (InterruptedException e) { throw e; } catch (Exception e) { @@ -130,31 +132,22 @@ public abstract class TaskProcessor { if (task.getAttempt() <= task.getRetries()) { processTask(task); } else { - reportFailure(task, e); + reportTaskFailure(task, e); } } } public abstract R process(T task) throws Exception; - private void reportSuccess(Task task) { - TaskResult result = TaskResult.builder() - .success(true) - .build(); - statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result); + private void reportTaskFailure(T task, Throwable error) { + reportTaskResult(task, task.toResult(false, Optional.of(error))); } - private void reportFailure(Task task, Throwable error) { - TaskResult result = TaskResult.builder() - .failure(task.toFailure(error)) - .build(); - statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result); + private void reportTaskDiscarded(T task) { + reportTaskResult(task, task.toResult(true, Optional.empty())); } - private void reportCancelled(Task task) { - TaskResult result = TaskResult.builder() - .discarded(true) - .build(); + private void reportTaskResult(T task, R result) { statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java index 3e511ccfe4..07b5aeadf9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java @@ -29,8 +29,7 @@ import org.thingsboard.server.common.data.job.JobResult; import org.thingsboard.server.common.data.job.JobStats; import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; -import org.thingsboard.server.common.data.job.TaskFailure; -import org.thingsboard.server.common.data.job.TaskResult; +import org.thingsboard.server.common.data.job.task.TaskResult; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.AbstractEntityService; @@ -118,17 +117,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi boolean publishEvent = false; for (TaskResult taskResult : jobStats.getTaskResults()) { - if (taskResult.isSuccess()) { - result.setSuccessfulCount(result.getSuccessfulCount() + 1); - } else if (taskResult.isDiscarded()) { - result.setDiscardedCount(result.getDiscardedCount() + 1); - } else { - TaskFailure failure = taskResult.getFailure(); - result.setFailedCount(result.getFailedCount() + 1); - if (result.getFailures().size() < 1000) { // preserving only first 1000 errors, not reprocessing if there are more failures - result.getFailures().add(failure); - } - } + result.processTaskResult(taskResult); if (result.getCancellationTs() > 0) { if (!taskResult.isDiscarded() && System.currentTimeMillis() > result.getCancellationTs()) {