diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java index b23722afd5..8da1be43f1 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java @@ -89,7 +89,7 @@ public class JobManagerTest extends AbstractControllerTest { @Test public void testSubmitJob_allTasksSuccessful() { - int tasksCount = 5; + int tasksCount = 7; JobId jobId = submitJob(DummyJobConfiguration.builder() .successfulTasksCount(tasksCount) .taskProcessingTimeMs(1000) @@ -154,10 +154,10 @@ public class JobManagerTest extends AbstractControllerTest { @Test public void testCancelJob_whileRunning() throws Exception { - int tasksCount = 100; + int tasksCount = 200; JobId jobId = submitJob(DummyJobConfiguration.builder() .successfulTasksCount(tasksCount) - .taskProcessingTimeMs(100) + .taskProcessingTimeMs(50) .build()).getId(); Thread.sleep(500); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java index 1988f13eb0..5b913af3e5 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.data.job.task; +import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; @@ -25,22 +26,25 @@ import org.thingsboard.server.common.data.job.JobType; @Data @EqualsAndHashCode(callSuper = true) @NoArgsConstructor -@SuperBuilder @ToString(callSuper = true) public class DummyTaskResult extends TaskResult { private DummyTaskFailure failure; + @Builder + private DummyTaskResult(boolean success, boolean discarded, DummyTaskFailure failure) { + super(success, discarded); + this.failure = failure; + } + public static DummyTaskResult success(DummyTask task) { return DummyTaskResult.builder() - .key(task.getKey()) .success(true) .build(); } public static DummyTaskResult failed(DummyTask task, Throwable error) { return DummyTaskResult.builder() - .key(task.getKey()) .failure(DummyTaskFailure.builder() .error(error.getMessage()) .number(task.getNumber()) @@ -51,7 +55,6 @@ public class DummyTaskResult extends TaskResult { public static DummyTaskResult discarded(DummyTask task) { return DummyTaskResult.builder() - .key(task.getKey()) .discarded(true) .build(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java index 21303a55fe..da3c8252eb 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java @@ -20,16 +20,12 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import lombok.experimental.SuperBuilder; import org.thingsboard.server.common.data.job.JobType; @Data -@AllArgsConstructor @NoArgsConstructor -@SuperBuilder @JsonIgnoreProperties(ignoreUnknown = true) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType") @JsonSubTypes({ @@ -40,6 +36,12 @@ public abstract class TaskResult { private String key; private boolean success; private boolean discarded; + private long finishTs; + + protected TaskResult(boolean success, boolean discarded) { + this.success = success; + this.discarded = discarded; + } @JsonIgnore public abstract JobType getJobType(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index 62ca19a05f..33c52859ca 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -232,6 +232,8 @@ public abstract class TaskProcessor, R extends TaskResult> { } private void reportTaskResult(T task, R result) { + result.setKey(task.getKey()); + result.setFinishTs(System.currentTimeMillis()); statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java index 153e95a404..360aa0063b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java @@ -69,7 +69,6 @@ public class DefaultJobService extends AbstractEntityService implements JobServi job.setStatus(QUEUED); } else { job.setStatus(PENDING); - job.getResult().setStartTs(System.currentTimeMillis()); } return saveJob(tenantId, job, true, null); } @@ -125,6 +124,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi } boolean publishEvent = false; + long lastFinishTs = 0; for (TaskResult taskResult : jobStats.getTaskResults()) { if (!taskResult.getKey().equals(job.getConfiguration().getTasksKey())) { log.debug("Ignoring task result {} with outdated key {}", taskResult, job.getConfiguration().getTasksKey()); @@ -140,6 +140,9 @@ public class DefaultJobService extends AbstractEntityService implements JobServi publishEvent = true; } } + if (taskResult.getFinishTs() > lastFinishTs) { + lastFinishTs = taskResult.getFinishTs(); + } } if (job.getStatus() == RUNNING) { @@ -153,7 +156,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi job.setStatus(COMPLETED); publishEvent = true; } - result.setFinishTs(System.currentTimeMillis()); + result.setFinishTs(lastFinishTs); job.getConfiguration().setToReprocess(null); } } @@ -166,6 +169,9 @@ public class DefaultJobService extends AbstractEntityService implements JobServi if (!Job.SUPPORTED_ENTITY_TYPES.contains(job.getEntityId().getEntityType())) { throw new IllegalArgumentException("Unsupported entity type " + job.getEntityId().getEntityType()); } + if (job.getStatus() == PENDING) { + job.getResult().setStartTs(System.currentTimeMillis()); + } job = jobDao.save(tenantId, job); if (publishEvent) {