From f4cd471082f864c8ad7f8a1d3c789859d5843d58 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Fri, 2 May 2025 12:26:23 +0300 Subject: [PATCH] Notification on job finish --- .../entitiy/EntityStateSourcingListener.java | 2 +- .../server/service/job/DefaultJobManager.java | 65 +++++++++++++++---- .../server/service/job/JobProcessor.java | 2 + .../server/service/job/JobManagerTest.java | 33 +++++++++- .../common/data/job/DummyJobResult.java | 8 +++ .../server/common/data/job/JobResult.java | 3 + .../server/common/data/job/JobType.java | 9 ++- .../server/dao/job/DefaultJobService.java | 4 +- 8 files changed, 108 insertions(+), 18 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java index f70354c3a8..f56cc3e952 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java @@ -304,7 +304,7 @@ public class EntityStateSourcingListener { private void onJobUpdate(Job job) { jobManager.onJobUpdate(job); - if (job.getResult().getCancellationTs() > 0 || job.getStatus().isOneOf(JobStatus.FAILED)) { + if (job.getResult().getCancellationTs() > 0 || (job.getStatus().isOneOf(JobStatus.FAILED) && job.getResult().getGeneralError() != null)) { // task processors will add this job to the list of discarded tbClusterService.broadcastEntityStateChangeEvent(job.getTenantId(), job.getId(), ComponentLifecycleEvent.STOPPED); } diff --git a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java index 3d601a9358..3da2e2a9ab 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java @@ -18,12 +18,12 @@ package org.thingsboard.server.service.job; import jakarta.annotation.PreDestroy; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; @@ -33,8 +33,12 @@ import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.task.Task; import org.thingsboard.server.common.data.job.task.TaskResult; +import org.thingsboard.server.common.data.notification.info.GeneralNotificationInfo; +import org.thingsboard.server.common.data.notification.targets.platform.TenantAdministratorsFilter; +import org.thingsboard.server.common.data.notification.template.NotificationTemplate; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.job.JobService; +import org.thingsboard.server.dao.notification.DefaultNotifications; import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; import org.thingsboard.server.queue.TbQueueCallback; @@ -65,6 +69,7 @@ public class DefaultJobManager implements JobManager { private final JobService jobService; private final JobStatsService jobStatsService; + private final NotificationCenter notificationCenter; private final Map jobProcessors; private final Map>> taskProducers; private final QueueConsumerManager> jobStatsConsumer; @@ -74,9 +79,11 @@ public class DefaultJobManager implements JobManager { @Value("${queue.tasks.stats.processing_interval_ms:5000}") private int statsProcessingInterval; - public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, TbCoreQueueFactory queueFactory, List jobProcessors) { + public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, NotificationCenter notificationCenter, + TbCoreQueueFactory queueFactory, List jobProcessors) { this.jobService = jobService; this.jobStatsService = jobStatsService; + this.notificationCenter = notificationCenter; this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity())); this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer)); this.executor = ThingsBoardExecutors.newWorkStealingPool(Math.max(4, Runtime.getRuntime().availableProcessors()), getClass()); @@ -104,10 +111,29 @@ public class DefaultJobManager implements JobManager { @Override public void onJobUpdate(Job job) { - if (job.getStatus() == JobStatus.PENDING) { - executor.execute(() -> { - processJob(job); - }); + JobStatus status = job.getStatus(); + switch (status) { + case PENDING -> { + executor.execute(() -> { + try { + processJob(job); + } catch (Throwable e) { + log.error("Failed to process job update: {}", job, e); + } + }); + } + case COMPLETED, FAILED -> { + executor.execute(() -> { + try { + if (status == JobStatus.COMPLETED) { + getJobProcessor(job.getType()).onJobCompleted(job); + } + sendJobFinishedNotification(job); + } catch (Throwable e) { + log.error("Failed to process job update: {}", job, e); + } + }); + } } } @@ -115,7 +141,7 @@ public class DefaultJobManager implements JobManager { TenantId tenantId = job.getTenantId(); JobId jobId = job.getId(); try { - JobProcessor processor = jobProcessors.get(job.getType()); + JobProcessor processor = getJobProcessor(job.getType()); List toReprocess = job.getConfiguration().getToReprocess(); if (toReprocess == null) { int tasksCount = processor.process(job, this::submitTask); // todo: think about stopping tb - while tasks are being submitted @@ -127,11 +153,7 @@ public class DefaultJobManager implements JobManager { } } catch (Throwable e) { log.error("[{}][{}][{}] Failed to submit tasks", tenantId, jobId, job.getType(), e); - try { - jobService.markAsFailed(tenantId, jobId, ExceptionUtils.getStackTrace(e)); - } catch (Throwable e2) { - log.error("[{}][{}] Failed to mark job as failed", tenantId, jobId, e2); - } + jobService.markAsFailed(tenantId, jobId, e.getMessage()); } } @@ -224,6 +246,25 @@ public class DefaultJobManager implements JobManager { Thread.sleep(statsProcessingInterval); // todo: test with bigger interval } + private void sendJobFinishedNotification(Job job) { + NotificationTemplate template = DefaultNotifications.DefaultNotification.builder() + .name("Job finished") + .subject("${type} ${status}") + .text("${description} ${status}: ${result}") + .build().toTemplate(); + GeneralNotificationInfo info = new GeneralNotificationInfo(Map.of( + "type", job.getType().getTitle(), + "description", job.getDescription(), + "status", job.getStatus().name().toLowerCase(), + "result", job.getResult().getDescription() + )); + notificationCenter.sendGeneralWebNotification(job.getTenantId(), new TenantAdministratorsFilter(), template, info); + } + + private JobProcessor getJobProcessor(JobType jobType) { + return jobProcessors.get(jobType); + } + @PreDestroy private void destroy() { jobStatsConsumer.stop(); diff --git a/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java index 301d4bf6eb..16ef5e404f 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java @@ -29,6 +29,8 @@ public interface JobProcessor { void reprocess(Job job, List taskFailures, Consumer> taskConsumer) throws Exception; + default void onJobCompleted(Job job) {} + JobType getType(); } diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java index ee1eee6531..39c87dfd70 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.job; import org.assertj.core.api.Assertions; +import org.assertj.core.api.ThrowingConsumer; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -32,6 +33,7 @@ import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.task.DummyTaskResult; import org.thingsboard.server.common.data.job.task.DummyTaskResult.DummyTaskFailure; +import org.thingsboard.server.common.data.notification.Notification; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.controller.AbstractControllerTest; import org.thingsboard.server.dao.job.JobService; @@ -85,7 +87,7 @@ public class JobManagerTest extends AbstractControllerTest { .tenantId(tenantId) .type(JobType.DUMMY) .key("test-job") - .description("test job") + .description("Test job") .configuration(DummyJobConfiguration.builder() .successfulTasksCount(tasksCount) .taskProcessingTimeMs(1000) @@ -105,6 +107,11 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(job.getResult().getResults()).isEmpty(); assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount); }); + + checkJobNotification(notification -> { + assertThat(notification.getSubject()).isEqualTo("Dummy job completed"); + assertThat(notification.getText()).isEqualTo("Test job completed: 5/5 successful, 0 failed"); + }); } @Test @@ -115,7 +122,7 @@ public class JobManagerTest extends AbstractControllerTest { .tenantId(tenantId) .type(JobType.DUMMY) .key("test-job") - .description("test job") + .description("Test job") .configuration(DummyJobConfiguration.builder() .successfulTasksCount(successfulTasks) .failedTasksCount(failedTasks) @@ -136,6 +143,11 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(((DummyTaskResult) jobResult.getResults().get(1)).getFailure().getError()).isEqualTo("error3"); // last error assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount()); }); + + checkJobNotification(notification -> { + assertThat(notification.getSubject()).isEqualTo("Dummy job failed"); + assertThat(notification.getText()).isEqualTo("Test job failed: 3/5 successful, 2 failed"); + }); } @Test @@ -311,7 +323,7 @@ public class JobManagerTest extends AbstractControllerTest { .tenantId(tenantId) .type(JobType.DUMMY) .key("test-job") - .description("test job") + .description("Test job") .configuration(DummyJobConfiguration.builder() .generalError("Some error while submitting tasks") .submittedTasksBeforeGeneralError(submittedTasks) @@ -326,6 +338,11 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(job.getResult().getDiscardedCount()).isBetween(1, submittedTasks); assertThat(job.getResult().getTotalCount()).isNull(); }); + + checkJobNotification(notification -> { + assertThat(notification.getSubject()).isEqualTo("Dummy job failed"); + assertThat(notification.getText()).isEqualTo("Test job failed: Some error while submitting tasks"); + }); } @Test @@ -426,6 +443,16 @@ public class JobManagerTest extends AbstractControllerTest { }); } + private void checkJobNotification(ThrowingConsumer assertFunction) { + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + Notification notification = getMyNotifications(true, 100).stream() + .findFirst().orElse(null); + assertThat(notification).isNotNull(); + + assertFunction.accept(notification); + }); + } + // todo: job with zero tasks } \ No newline at end of file diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobResult.java index 031a733d51..3a9aabae76 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobResult.java @@ -17,6 +17,14 @@ package org.thingsboard.server.common.data.job; public class DummyJobResult extends JobResult { + @Override + public String getDescription() { + if (getGeneralError() != null) { + return getGeneralError(); + } + return getSuccessfulCount() + "/" + getTotalCount() + " successful, " + getFailedCount() + " failed"; + } + @Override public JobType getJobType() { return JobType.DUMMY; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java index 4e4787bbe5..534e3587bb 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java @@ -64,6 +64,9 @@ public abstract class JobResult implements Serializable { } } + @JsonIgnore + public abstract String getDescription(); + public abstract JobType getJobType(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobType.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobType.java index 9e8e9fa7e5..c2c461d12b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobType.java @@ -15,9 +15,16 @@ */ package org.thingsboard.server.common.data.job; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +@Getter public enum JobType { - DUMMY; + DUMMY("Dummy job"); + + private final String title; public String getTasksTopic() { return "tasks." + name().toLowerCase(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java index 07b5aeadf9..73e0c5dff4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java @@ -117,7 +117,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi boolean publishEvent = false; for (TaskResult taskResult : jobStats.getTaskResults()) { - result.processTaskResult(taskResult); + result.processTaskResult(taskResult); if (result.getCancellationTs() > 0) { if (!taskResult.isDiscarded() && System.currentTimeMillis() > result.getCancellationTs()) { @@ -134,8 +134,10 @@ public class DefaultJobService extends AbstractEntityService implements JobServi job.setStatus(CANCELLED); } else if (result.getFailedCount() > 0) { job.setStatus(FAILED); + publishEvent = true; } else { job.setStatus(COMPLETED); + publishEvent = true; } } }