Improvements for task processing
This commit is contained in:
		
							parent
							
								
									a736f5d212
								
							
						
					
					
						commit
						957965b351
					
				@ -89,7 +89,7 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Test
 | 
					    @Test
 | 
				
			||||||
    public void testSubmitJob_allTasksSuccessful() {
 | 
					    public void testSubmitJob_allTasksSuccessful() {
 | 
				
			||||||
        int tasksCount = 5;
 | 
					        int tasksCount = 7;
 | 
				
			||||||
        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
					        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
				
			||||||
                .successfulTasksCount(tasksCount)
 | 
					                .successfulTasksCount(tasksCount)
 | 
				
			||||||
                .taskProcessingTimeMs(1000)
 | 
					                .taskProcessingTimeMs(1000)
 | 
				
			||||||
@ -154,10 +154,10 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Test
 | 
					    @Test
 | 
				
			||||||
    public void testCancelJob_whileRunning() throws Exception {
 | 
					    public void testCancelJob_whileRunning() throws Exception {
 | 
				
			||||||
        int tasksCount = 100;
 | 
					        int tasksCount = 200;
 | 
				
			||||||
        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
					        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
				
			||||||
                .successfulTasksCount(tasksCount)
 | 
					                .successfulTasksCount(tasksCount)
 | 
				
			||||||
                .taskProcessingTimeMs(100)
 | 
					                .taskProcessingTimeMs(50)
 | 
				
			||||||
                .build()).getId();
 | 
					                .build()).getId();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Thread.sleep(500);
 | 
					        Thread.sleep(500);
 | 
				
			||||||
 | 
				
			|||||||
@ -15,6 +15,7 @@
 | 
				
			|||||||
 */
 | 
					 */
 | 
				
			||||||
package org.thingsboard.server.common.data.job.task;
 | 
					package org.thingsboard.server.common.data.job.task;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import lombok.Builder;
 | 
				
			||||||
import lombok.Data;
 | 
					import lombok.Data;
 | 
				
			||||||
import lombok.EqualsAndHashCode;
 | 
					import lombok.EqualsAndHashCode;
 | 
				
			||||||
import lombok.NoArgsConstructor;
 | 
					import lombok.NoArgsConstructor;
 | 
				
			||||||
@ -25,22 +26,25 @@ import org.thingsboard.server.common.data.job.JobType;
 | 
				
			|||||||
@Data
 | 
					@Data
 | 
				
			||||||
@EqualsAndHashCode(callSuper = true)
 | 
					@EqualsAndHashCode(callSuper = true)
 | 
				
			||||||
@NoArgsConstructor
 | 
					@NoArgsConstructor
 | 
				
			||||||
@SuperBuilder
 | 
					 | 
				
			||||||
@ToString(callSuper = true)
 | 
					@ToString(callSuper = true)
 | 
				
			||||||
public class DummyTaskResult extends TaskResult {
 | 
					public class DummyTaskResult extends TaskResult {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private DummyTaskFailure failure;
 | 
					    private DummyTaskFailure failure;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Builder
 | 
				
			||||||
 | 
					    private DummyTaskResult(boolean success, boolean discarded, DummyTaskFailure failure) {
 | 
				
			||||||
 | 
					        super(success, discarded);
 | 
				
			||||||
 | 
					        this.failure = failure;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public static DummyTaskResult success(DummyTask task) {
 | 
					    public static DummyTaskResult success(DummyTask task) {
 | 
				
			||||||
        return DummyTaskResult.builder()
 | 
					        return DummyTaskResult.builder()
 | 
				
			||||||
                .key(task.getKey())
 | 
					 | 
				
			||||||
                .success(true)
 | 
					                .success(true)
 | 
				
			||||||
                .build();
 | 
					                .build();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public static DummyTaskResult failed(DummyTask task, Throwable error) {
 | 
					    public static DummyTaskResult failed(DummyTask task, Throwable error) {
 | 
				
			||||||
        return DummyTaskResult.builder()
 | 
					        return DummyTaskResult.builder()
 | 
				
			||||||
                .key(task.getKey())
 | 
					 | 
				
			||||||
                .failure(DummyTaskFailure.builder()
 | 
					                .failure(DummyTaskFailure.builder()
 | 
				
			||||||
                        .error(error.getMessage())
 | 
					                        .error(error.getMessage())
 | 
				
			||||||
                        .number(task.getNumber())
 | 
					                        .number(task.getNumber())
 | 
				
			||||||
@ -51,7 +55,6 @@ public class DummyTaskResult extends TaskResult {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    public static DummyTaskResult discarded(DummyTask task) {
 | 
					    public static DummyTaskResult discarded(DummyTask task) {
 | 
				
			||||||
        return DummyTaskResult.builder()
 | 
					        return DummyTaskResult.builder()
 | 
				
			||||||
                .key(task.getKey())
 | 
					 | 
				
			||||||
                .discarded(true)
 | 
					                .discarded(true)
 | 
				
			||||||
                .build();
 | 
					                .build();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -20,16 +20,12 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 | 
				
			|||||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
 | 
					import com.fasterxml.jackson.annotation.JsonSubTypes;
 | 
				
			||||||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
 | 
					import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
 | 
				
			||||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
 | 
					import com.fasterxml.jackson.annotation.JsonTypeInfo;
 | 
				
			||||||
import lombok.AllArgsConstructor;
 | 
					 | 
				
			||||||
import lombok.Data;
 | 
					import lombok.Data;
 | 
				
			||||||
import lombok.NoArgsConstructor;
 | 
					import lombok.NoArgsConstructor;
 | 
				
			||||||
import lombok.experimental.SuperBuilder;
 | 
					 | 
				
			||||||
import org.thingsboard.server.common.data.job.JobType;
 | 
					import org.thingsboard.server.common.data.job.JobType;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@Data
 | 
					@Data
 | 
				
			||||||
@AllArgsConstructor
 | 
					 | 
				
			||||||
@NoArgsConstructor
 | 
					@NoArgsConstructor
 | 
				
			||||||
@SuperBuilder
 | 
					 | 
				
			||||||
@JsonIgnoreProperties(ignoreUnknown = true)
 | 
					@JsonIgnoreProperties(ignoreUnknown = true)
 | 
				
			||||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType")
 | 
					@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType")
 | 
				
			||||||
@JsonSubTypes({
 | 
					@JsonSubTypes({
 | 
				
			||||||
@ -40,6 +36,12 @@ public abstract class TaskResult {
 | 
				
			|||||||
    private String key;
 | 
					    private String key;
 | 
				
			||||||
    private boolean success;
 | 
					    private boolean success;
 | 
				
			||||||
    private boolean discarded;
 | 
					    private boolean discarded;
 | 
				
			||||||
 | 
					    private long finishTs;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    protected TaskResult(boolean success, boolean discarded) {
 | 
				
			||||||
 | 
					        this.success = success;
 | 
				
			||||||
 | 
					        this.discarded = discarded;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @JsonIgnore
 | 
					    @JsonIgnore
 | 
				
			||||||
    public abstract JobType getJobType();
 | 
					    public abstract JobType getJobType();
 | 
				
			||||||
 | 
				
			|||||||
@ -232,6 +232,8 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void reportTaskResult(T task, R result) {
 | 
					    private void reportTaskResult(T task, R result) {
 | 
				
			||||||
 | 
					        result.setKey(task.getKey());
 | 
				
			||||||
 | 
					        result.setFinishTs(System.currentTimeMillis());
 | 
				
			||||||
        statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
 | 
					        statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -69,7 +69,6 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
 | 
				
			|||||||
            job.setStatus(QUEUED);
 | 
					            job.setStatus(QUEUED);
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            job.setStatus(PENDING);
 | 
					            job.setStatus(PENDING);
 | 
				
			||||||
            job.getResult().setStartTs(System.currentTimeMillis());
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        return saveJob(tenantId, job, true, null);
 | 
					        return saveJob(tenantId, job, true, null);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -125,6 +124,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        boolean publishEvent = false;
 | 
					        boolean publishEvent = false;
 | 
				
			||||||
 | 
					        long lastFinishTs = 0;
 | 
				
			||||||
        for (TaskResult taskResult : jobStats.getTaskResults()) {
 | 
					        for (TaskResult taskResult : jobStats.getTaskResults()) {
 | 
				
			||||||
            if (!taskResult.getKey().equals(job.getConfiguration().getTasksKey())) {
 | 
					            if (!taskResult.getKey().equals(job.getConfiguration().getTasksKey())) {
 | 
				
			||||||
                log.debug("Ignoring task result {} with outdated key {}", taskResult, job.getConfiguration().getTasksKey());
 | 
					                log.debug("Ignoring task result {} with outdated key {}", taskResult, job.getConfiguration().getTasksKey());
 | 
				
			||||||
@ -140,6 +140,9 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
 | 
				
			|||||||
                    publishEvent = true;
 | 
					                    publishEvent = true;
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					            if (taskResult.getFinishTs() > lastFinishTs) {
 | 
				
			||||||
 | 
					                lastFinishTs = taskResult.getFinishTs();
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (job.getStatus() == RUNNING) {
 | 
					        if (job.getStatus() == RUNNING) {
 | 
				
			||||||
@ -153,7 +156,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
 | 
				
			|||||||
                    job.setStatus(COMPLETED);
 | 
					                    job.setStatus(COMPLETED);
 | 
				
			||||||
                    publishEvent = true;
 | 
					                    publishEvent = true;
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                result.setFinishTs(System.currentTimeMillis());
 | 
					                result.setFinishTs(lastFinishTs);
 | 
				
			||||||
                job.getConfiguration().setToReprocess(null);
 | 
					                job.getConfiguration().setToReprocess(null);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@ -166,6 +169,9 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
 | 
				
			|||||||
        if (!Job.SUPPORTED_ENTITY_TYPES.contains(job.getEntityId().getEntityType())) {
 | 
					        if (!Job.SUPPORTED_ENTITY_TYPES.contains(job.getEntityId().getEntityType())) {
 | 
				
			||||||
            throw new IllegalArgumentException("Unsupported entity type " + job.getEntityId().getEntityType());
 | 
					            throw new IllegalArgumentException("Unsupported entity type " + job.getEntityId().getEntityType());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					        if (job.getStatus() == PENDING) {
 | 
				
			||||||
 | 
					            job.getResult().setStartTs(System.currentTimeMillis());
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        job = jobDao.save(tenantId, job);
 | 
					        job = jobDao.save(tenantId, job);
 | 
				
			||||||
        if (publishEvent) {
 | 
					        if (publishEvent) {
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user