Notification on job finish
This commit is contained in:
		
							parent
							
								
									950d1d85c4
								
							
						
					
					
						commit
						f4cd471082
					
				@ -304,7 +304,7 @@ public class EntityStateSourcingListener {
 | 
			
		||||
 | 
			
		||||
    private void onJobUpdate(Job job) {
 | 
			
		||||
        jobManager.onJobUpdate(job);
 | 
			
		||||
        if (job.getResult().getCancellationTs() > 0 || job.getStatus().isOneOf(JobStatus.FAILED)) {
 | 
			
		||||
        if (job.getResult().getCancellationTs() > 0 || (job.getStatus().isOneOf(JobStatus.FAILED) && job.getResult().getGeneralError() != null)) {
 | 
			
		||||
            // task processors will add this job to the list of discarded
 | 
			
		||||
            tbClusterService.broadcastEntityStateChangeEvent(job.getTenantId(), job.getId(), ComponentLifecycleEvent.STOPPED);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -18,12 +18,12 @@ package org.thingsboard.server.service.job;
 | 
			
		||||
import jakarta.annotation.PreDestroy;
 | 
			
		||||
import lombok.SneakyThrows;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.lang3.exception.ExceptionUtils;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardExecutors;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NotificationCenter;
 | 
			
		||||
import org.thingsboard.server.common.data.id.JobId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.job.Job;
 | 
			
		||||
@ -33,8 +33,12 @@ import org.thingsboard.server.common.data.job.JobStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.job.JobType;
 | 
			
		||||
import org.thingsboard.server.common.data.job.task.Task;
 | 
			
		||||
import org.thingsboard.server.common.data.job.task.TaskResult;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.info.GeneralNotificationInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.targets.platform.TenantAdministratorsFilter;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.template.NotificationTemplate;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.dao.job.JobService;
 | 
			
		||||
import org.thingsboard.server.dao.notification.DefaultNotifications;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueCallback;
 | 
			
		||||
@ -65,6 +69,7 @@ public class DefaultJobManager implements JobManager {
 | 
			
		||||
 | 
			
		||||
    private final JobService jobService;
 | 
			
		||||
    private final JobStatsService jobStatsService;
 | 
			
		||||
    private final NotificationCenter notificationCenter;
 | 
			
		||||
    private final Map<JobType, JobProcessor> jobProcessors;
 | 
			
		||||
    private final Map<JobType, TbQueueProducer<TbProtoQueueMsg<TaskProto>>> taskProducers;
 | 
			
		||||
    private final QueueConsumerManager<TbProtoQueueMsg<JobStatsMsg>> jobStatsConsumer;
 | 
			
		||||
@ -74,9 +79,11 @@ public class DefaultJobManager implements JobManager {
 | 
			
		||||
    @Value("${queue.tasks.stats.processing_interval_ms:5000}")
 | 
			
		||||
    private int statsProcessingInterval;
 | 
			
		||||
 | 
			
		||||
    public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, TbCoreQueueFactory queueFactory, List<JobProcessor> jobProcessors) {
 | 
			
		||||
    public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, NotificationCenter notificationCenter,
 | 
			
		||||
                             TbCoreQueueFactory queueFactory, List<JobProcessor> jobProcessors) {
 | 
			
		||||
        this.jobService = jobService;
 | 
			
		||||
        this.jobStatsService = jobStatsService;
 | 
			
		||||
        this.notificationCenter = notificationCenter;
 | 
			
		||||
        this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity()));
 | 
			
		||||
        this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer));
 | 
			
		||||
        this.executor = ThingsBoardExecutors.newWorkStealingPool(Math.max(4, Runtime.getRuntime().availableProcessors()), getClass());
 | 
			
		||||
@ -104,10 +111,29 @@ public class DefaultJobManager implements JobManager {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onJobUpdate(Job job) {
 | 
			
		||||
        if (job.getStatus() == JobStatus.PENDING) {
 | 
			
		||||
            executor.execute(() -> {
 | 
			
		||||
                processJob(job);
 | 
			
		||||
            });
 | 
			
		||||
        JobStatus status = job.getStatus();
 | 
			
		||||
        switch (status) {
 | 
			
		||||
            case PENDING -> {
 | 
			
		||||
                executor.execute(() -> {
 | 
			
		||||
                    try {
 | 
			
		||||
                        processJob(job);
 | 
			
		||||
                    } catch (Throwable e) {
 | 
			
		||||
                        log.error("Failed to process job update: {}", job, e);
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
            case COMPLETED, FAILED -> {
 | 
			
		||||
                executor.execute(() -> {
 | 
			
		||||
                    try {
 | 
			
		||||
                        if (status == JobStatus.COMPLETED) {
 | 
			
		||||
                            getJobProcessor(job.getType()).onJobCompleted(job);
 | 
			
		||||
                        }
 | 
			
		||||
                        sendJobFinishedNotification(job);
 | 
			
		||||
                    } catch (Throwable e) {
 | 
			
		||||
                        log.error("Failed to process job update: {}", job, e);
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -115,7 +141,7 @@ public class DefaultJobManager implements JobManager {
 | 
			
		||||
        TenantId tenantId = job.getTenantId();
 | 
			
		||||
        JobId jobId = job.getId();
 | 
			
		||||
        try {
 | 
			
		||||
            JobProcessor processor = jobProcessors.get(job.getType());
 | 
			
		||||
            JobProcessor processor = getJobProcessor(job.getType());
 | 
			
		||||
            List<TaskResult> toReprocess = job.getConfiguration().getToReprocess();
 | 
			
		||||
            if (toReprocess == null) {
 | 
			
		||||
                int tasksCount = processor.process(job, this::submitTask); // todo: think about stopping tb - while tasks are being submitted
 | 
			
		||||
@ -127,11 +153,7 @@ public class DefaultJobManager implements JobManager {
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Throwable e) {
 | 
			
		||||
            log.error("[{}][{}][{}] Failed to submit tasks", tenantId, jobId, job.getType(), e);
 | 
			
		||||
            try {
 | 
			
		||||
                jobService.markAsFailed(tenantId, jobId, ExceptionUtils.getStackTrace(e));
 | 
			
		||||
            } catch (Throwable e2) {
 | 
			
		||||
                log.error("[{}][{}] Failed to mark job as failed", tenantId, jobId, e2);
 | 
			
		||||
            }
 | 
			
		||||
            jobService.markAsFailed(tenantId, jobId, e.getMessage());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -224,6 +246,25 @@ public class DefaultJobManager implements JobManager {
 | 
			
		||||
        Thread.sleep(statsProcessingInterval); // todo: test with bigger interval
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void sendJobFinishedNotification(Job job) {
 | 
			
		||||
        NotificationTemplate template = DefaultNotifications.DefaultNotification.builder()
 | 
			
		||||
                .name("Job finished")
 | 
			
		||||
                .subject("${type} ${status}")
 | 
			
		||||
                .text("${description} ${status}: ${result}")
 | 
			
		||||
                .build().toTemplate();
 | 
			
		||||
        GeneralNotificationInfo info = new GeneralNotificationInfo(Map.of(
 | 
			
		||||
                "type", job.getType().getTitle(),
 | 
			
		||||
                "description", job.getDescription(),
 | 
			
		||||
                "status", job.getStatus().name().toLowerCase(),
 | 
			
		||||
                "result", job.getResult().getDescription()
 | 
			
		||||
        ));
 | 
			
		||||
        notificationCenter.sendGeneralWebNotification(job.getTenantId(), new TenantAdministratorsFilter(), template, info);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private JobProcessor getJobProcessor(JobType jobType) {
 | 
			
		||||
        return jobProcessors.get(jobType);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
    private void destroy() {
 | 
			
		||||
        jobStatsConsumer.stop();
 | 
			
		||||
 | 
			
		||||
@ -29,6 +29,8 @@ public interface JobProcessor {
 | 
			
		||||
 | 
			
		||||
    void reprocess(Job job, List<TaskResult> taskFailures, Consumer<Task<?>> taskConsumer) throws Exception;
 | 
			
		||||
 | 
			
		||||
    default void onJobCompleted(Job job) {}
 | 
			
		||||
 | 
			
		||||
    JobType getType();
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.service.job;
 | 
			
		||||
 | 
			
		||||
import org.assertj.core.api.Assertions;
 | 
			
		||||
import org.assertj.core.api.ThrowingConsumer;
 | 
			
		||||
import org.junit.After;
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
@ -32,6 +33,7 @@ import org.thingsboard.server.common.data.job.JobStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.job.JobType;
 | 
			
		||||
import org.thingsboard.server.common.data.job.task.DummyTaskResult;
 | 
			
		||||
import org.thingsboard.server.common.data.job.task.DummyTaskResult.DummyTaskFailure;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.Notification;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.controller.AbstractControllerTest;
 | 
			
		||||
import org.thingsboard.server.dao.job.JobService;
 | 
			
		||||
@ -85,7 +87,7 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("test job")
 | 
			
		||||
                .description("Test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .successfulTasksCount(tasksCount)
 | 
			
		||||
                        .taskProcessingTimeMs(1000)
 | 
			
		||||
@ -105,6 +107,11 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
            assertThat(job.getResult().getResults()).isEmpty();
 | 
			
		||||
            assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        checkJobNotification(notification -> {
 | 
			
		||||
            assertThat(notification.getSubject()).isEqualTo("Dummy job completed");
 | 
			
		||||
            assertThat(notification.getText()).isEqualTo("Test job completed: 5/5 successful, 0 failed");
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -115,7 +122,7 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("test job")
 | 
			
		||||
                .description("Test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .successfulTasksCount(successfulTasks)
 | 
			
		||||
                        .failedTasksCount(failedTasks)
 | 
			
		||||
@ -136,6 +143,11 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
            assertThat(((DummyTaskResult) jobResult.getResults().get(1)).getFailure().getError()).isEqualTo("error3"); // last error
 | 
			
		||||
            assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount());
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        checkJobNotification(notification -> {
 | 
			
		||||
            assertThat(notification.getSubject()).isEqualTo("Dummy job failed");
 | 
			
		||||
            assertThat(notification.getText()).isEqualTo("Test job failed: 3/5 successful, 2 failed");
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -311,7 +323,7 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("test job")
 | 
			
		||||
                .description("Test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .generalError("Some error while submitting tasks")
 | 
			
		||||
                        .submittedTasksBeforeGeneralError(submittedTasks)
 | 
			
		||||
@ -326,6 +338,11 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
            assertThat(job.getResult().getDiscardedCount()).isBetween(1, submittedTasks);
 | 
			
		||||
            assertThat(job.getResult().getTotalCount()).isNull();
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        checkJobNotification(notification -> {
 | 
			
		||||
            assertThat(notification.getSubject()).isEqualTo("Dummy job failed");
 | 
			
		||||
            assertThat(notification.getText()).isEqualTo("Test job failed: Some error while submitting tasks");
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -426,6 +443,16 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void checkJobNotification(ThrowingConsumer<Notification> assertFunction) {
 | 
			
		||||
        await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
            Notification notification = getMyNotifications(true, 100).stream()
 | 
			
		||||
                    .findFirst().orElse(null);
 | 
			
		||||
            assertThat(notification).isNotNull();
 | 
			
		||||
 | 
			
		||||
            assertFunction.accept(notification);
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // todo: job with zero tasks
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -17,6 +17,14 @@ package org.thingsboard.server.common.data.job;
 | 
			
		||||
 | 
			
		||||
public class DummyJobResult extends JobResult {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String getDescription() {
 | 
			
		||||
        if (getGeneralError() != null) {
 | 
			
		||||
            return getGeneralError();
 | 
			
		||||
        }
 | 
			
		||||
        return getSuccessfulCount() + "/" + getTotalCount() + " successful, " + getFailedCount() + " failed";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public JobType getJobType() {
 | 
			
		||||
        return JobType.DUMMY;
 | 
			
		||||
 | 
			
		||||
@ -64,6 +64,9 @@ public abstract class JobResult implements Serializable {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @JsonIgnore
 | 
			
		||||
    public abstract String getDescription();
 | 
			
		||||
 | 
			
		||||
    public abstract JobType getJobType();
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,9 +15,16 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.data.job;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@Getter
 | 
			
		||||
public enum JobType {
 | 
			
		||||
 | 
			
		||||
    DUMMY;
 | 
			
		||||
    DUMMY("Dummy job");
 | 
			
		||||
 | 
			
		||||
    private final String title;
 | 
			
		||||
 | 
			
		||||
    public String getTasksTopic() {
 | 
			
		||||
        return "tasks." + name().toLowerCase();
 | 
			
		||||
 | 
			
		||||
@ -117,7 +117,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
 | 
			
		||||
 | 
			
		||||
        boolean publishEvent = false;
 | 
			
		||||
        for (TaskResult taskResult : jobStats.getTaskResults()) {
 | 
			
		||||
           result.processTaskResult(taskResult);
 | 
			
		||||
            result.processTaskResult(taskResult);
 | 
			
		||||
 | 
			
		||||
            if (result.getCancellationTs() > 0) {
 | 
			
		||||
                if (!taskResult.isDiscarded() && System.currentTimeMillis() > result.getCancellationTs()) {
 | 
			
		||||
@ -134,8 +134,10 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
 | 
			
		||||
                    job.setStatus(CANCELLED);
 | 
			
		||||
                } else if (result.getFailedCount() > 0) {
 | 
			
		||||
                    job.setStatus(FAILED);
 | 
			
		||||
                    publishEvent = true;
 | 
			
		||||
                } else {
 | 
			
		||||
                    job.setStatus(COMPLETED);
 | 
			
		||||
                    publishEvent = true;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user