Introduce entity id for jobs
This commit is contained in:
		
							parent
							
								
									df2d8cc895
								
							
						
					
					
						commit
						b618249c62
					
				@ -26,6 +26,7 @@ import org.springframework.web.bind.annotation.PostMapping;
 | 
			
		||||
import org.springframework.web.bind.annotation.RequestMapping;
 | 
			
		||||
import org.springframework.web.bind.annotation.RequestParam;
 | 
			
		||||
import org.springframework.web.bind.annotation.RestController;
 | 
			
		||||
import org.thingsboard.rule.engine.api.JobManager;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
 | 
			
		||||
import org.thingsboard.server.common.data.id.JobId;
 | 
			
		||||
import org.thingsboard.server.common.data.job.Job;
 | 
			
		||||
@ -36,7 +37,6 @@ import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.dao.job.JobService;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.rule.engine.api.JobManager;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
@ -76,12 +76,18 @@ public class JobController extends BaseController {
 | 
			
		||||
                                 @Parameter(description = SORT_ORDER_DESCRIPTION)
 | 
			
		||||
                                 @RequestParam(required = false) String sortOrder,
 | 
			
		||||
                                 @RequestParam(required = false) List<JobType> types,
 | 
			
		||||
                                 @RequestParam(required = false) List<JobStatus> statuses) throws ThingsboardException {
 | 
			
		||||
                                 @RequestParam(required = false) List<JobStatus> statuses,
 | 
			
		||||
                                 @RequestParam(required = false) List<UUID> entities,
 | 
			
		||||
                                 @RequestParam(required = false) Long startTime,
 | 
			
		||||
                                 @RequestParam(required = false) Long endTime) throws ThingsboardException {
 | 
			
		||||
        // todo check permissions
 | 
			
		||||
        PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder);
 | 
			
		||||
        JobFilter filter = JobFilter.builder()
 | 
			
		||||
                .types(types)
 | 
			
		||||
                .statuses(statuses)
 | 
			
		||||
                .entities(entities)
 | 
			
		||||
                .startTime(startTime)
 | 
			
		||||
                .endTime(endTime)
 | 
			
		||||
                .build();
 | 
			
		||||
        return jobService.findJobsByFilter(getTenantId(), filter, pageLink);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,43 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2025 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.housekeeper.processor;
 | 
			
		||||
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.housekeeper.HousekeeperTask;
 | 
			
		||||
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
 | 
			
		||||
import org.thingsboard.server.dao.job.JobService;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class JobsDeletionTaskProcessor extends HousekeeperTaskProcessor<HousekeeperTask> {
 | 
			
		||||
 | 
			
		||||
    private final JobService jobService;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void process(HousekeeperTask task) throws Exception {
 | 
			
		||||
        int deletedCount = jobService.deleteJobsByEntityId(task.getTenantId(), task.getEntityId());
 | 
			
		||||
        log.debug("[{}][{}][{}] Deleted {} jobs", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), deletedCount);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public HousekeeperTaskType getTaskType() {
 | 
			
		||||
        return HousekeeperTaskType.DELETE_JOBS;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -22,7 +22,6 @@ import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardExecutors;
 | 
			
		||||
import org.thingsboard.rule.engine.api.JobManager;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NotificationCenter;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.JobId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
@ -32,13 +31,9 @@ import org.thingsboard.server.common.data.job.JobStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.job.JobType;
 | 
			
		||||
import org.thingsboard.server.common.data.job.task.Task;
 | 
			
		||||
import org.thingsboard.server.common.data.job.task.TaskResult;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.info.GeneralNotificationInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.targets.platform.TenantAdministratorsFilter;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.template.NotificationTemplate;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.dao.job.JobService;
 | 
			
		||||
import org.thingsboard.server.dao.notification.DefaultNotifications;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueCallback;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
 | 
			
		||||
@ -63,19 +58,17 @@ public class DefaultJobManager implements JobManager {
 | 
			
		||||
 | 
			
		||||
    private final JobService jobService;
 | 
			
		||||
    private final JobStatsService jobStatsService;
 | 
			
		||||
    private final NotificationCenter notificationCenter;
 | 
			
		||||
    private final PartitionService partitionService;
 | 
			
		||||
    private final TasksQueueConfig queueConfig;
 | 
			
		||||
    private final Map<JobType, JobProcessor> jobProcessors;
 | 
			
		||||
    private final Map<JobType, TbQueueProducer<TbProtoQueueMsg<TaskProto>>> taskProducers;
 | 
			
		||||
    private final ExecutorService executor;
 | 
			
		||||
 | 
			
		||||
    public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, NotificationCenter notificationCenter,
 | 
			
		||||
                             PartitionService partitionService, TaskProducerQueueFactory queueFactory, TasksQueueConfig queueConfig,
 | 
			
		||||
    public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, PartitionService partitionService,
 | 
			
		||||
                             TaskProducerQueueFactory queueFactory, TasksQueueConfig queueConfig,
 | 
			
		||||
                             List<JobProcessor> jobProcessors) {
 | 
			
		||||
        this.jobService = jobService;
 | 
			
		||||
        this.jobStatsService = jobStatsService;
 | 
			
		||||
        this.notificationCenter = notificationCenter;
 | 
			
		||||
        this.partitionService = partitionService;
 | 
			
		||||
        this.queueConfig = queueConfig;
 | 
			
		||||
        this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity()));
 | 
			
		||||
@ -105,10 +98,7 @@ public class DefaultJobManager implements JobManager {
 | 
			
		||||
            case COMPLETED, FAILED -> {
 | 
			
		||||
                executor.execute(() -> {
 | 
			
		||||
                    try {
 | 
			
		||||
                        if (status == JobStatus.COMPLETED) {
 | 
			
		||||
                            getJobProcessor(job.getType()).onJobCompleted(job);
 | 
			
		||||
                        }
 | 
			
		||||
                        sendJobFinishedNotification(job);
 | 
			
		||||
                        getJobProcessor(job.getType()).onJobFinished(job);
 | 
			
		||||
                    } catch (Throwable e) {
 | 
			
		||||
                        log.error("Failed to process job update: {}", job, e);
 | 
			
		||||
                    }
 | 
			
		||||
@ -203,22 +193,6 @@ public class DefaultJobManager implements JobManager {
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void sendJobFinishedNotification(Job job) {
 | 
			
		||||
        NotificationTemplate template = DefaultNotifications.DefaultNotification.builder()
 | 
			
		||||
                .name("Job finished")
 | 
			
		||||
                .subject("${type} task ${status}")
 | 
			
		||||
                .text("${description} ${status}: ${result}")
 | 
			
		||||
                .build().toTemplate();
 | 
			
		||||
        GeneralNotificationInfo info = new GeneralNotificationInfo(Map.of(
 | 
			
		||||
                "type", job.getType().getTitle(),
 | 
			
		||||
                "description", job.getDescription(),
 | 
			
		||||
                "status", job.getStatus().name().toLowerCase(),
 | 
			
		||||
                "result", job.getResult().getDescription()
 | 
			
		||||
        ));
 | 
			
		||||
        // todo: button to see details (forward to jobs page)
 | 
			
		||||
        notificationCenter.sendGeneralWebNotification(job.getTenantId(), new TenantAdministratorsFilter(), template, info);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private JobProcessor getJobProcessor(JobType jobType) {
 | 
			
		||||
        return jobProcessors.get(jobType);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -29,7 +29,7 @@ public interface JobProcessor {
 | 
			
		||||
 | 
			
		||||
    void reprocess(Job job, List<TaskResult> taskFailures, Consumer<Task<?>> taskConsumer) throws Exception;
 | 
			
		||||
 | 
			
		||||
    default void onJobCompleted(Job job) {}
 | 
			
		||||
    default void onJobFinished(Job job) {}
 | 
			
		||||
 | 
			
		||||
    JobType getType();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1272,8 +1272,9 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
 | 
			
		||||
        return doGetTypedWithPageLink("/api/jobs?", new TypeReference<PageData<Job>>() {}, new PageLink(100, 0, null, new SortOrder("createdTime", SortOrder.Direction.DESC))).getData();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected List<Job> findJobs(JobType... types) throws Exception {
 | 
			
		||||
        return doGetTypedWithPageLink("/api/jobs?types=" + Arrays.stream(types).map(Enum::name).collect(Collectors.joining(",")) + "&",
 | 
			
		||||
    protected List<Job> findJobs(List<JobType> types, List<UUID> entities) throws Exception {
 | 
			
		||||
        return doGetTypedWithPageLink("/api/jobs?types=" + types.stream().map(Enum::name).collect(Collectors.joining(",")) +
 | 
			
		||||
                                      "&entities=" + entities.stream().map(UUID::toString).collect(Collectors.joining(",")) + "&",
 | 
			
		||||
                new TypeReference<PageData<Job>>() {}, new PageLink(100, 0, null, new SortOrder("createdTime", SortOrder.Direction.DESC))).getData();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -15,7 +15,6 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.job;
 | 
			
		||||
 | 
			
		||||
import org.assertj.core.api.ThrowingConsumer;
 | 
			
		||||
import org.junit.After;
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
@ -24,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.boot.test.mock.mockito.SpyBean;
 | 
			
		||||
import org.springframework.test.context.TestPropertySource;
 | 
			
		||||
import org.thingsboard.rule.engine.api.JobManager;
 | 
			
		||||
import org.thingsboard.server.common.data.Device;
 | 
			
		||||
import org.thingsboard.server.common.data.id.JobId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.job.DummyJobConfiguration;
 | 
			
		||||
@ -34,7 +34,6 @@ import org.thingsboard.server.common.data.job.JobStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.job.JobType;
 | 
			
		||||
import org.thingsboard.server.common.data.job.task.DummyTaskResult;
 | 
			
		||||
import org.thingsboard.server.common.data.job.task.DummyTaskResult.DummyTaskFailure;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.Notification;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.controller.AbstractControllerTest;
 | 
			
		||||
import org.thingsboard.server.dao.job.JobDao;
 | 
			
		||||
@ -53,6 +52,7 @@ import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.Mockito.doAnswer;
 | 
			
		||||
import static org.mockito.Mockito.never;
 | 
			
		||||
import static org.mockito.Mockito.verify;
 | 
			
		||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 | 
			
		||||
 | 
			
		||||
@DaoSqlTest
 | 
			
		||||
@TestPropertySource(properties = {
 | 
			
		||||
@ -72,9 +72,14 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private JobDao jobDao;
 | 
			
		||||
 | 
			
		||||
    private TenantId tenantId;
 | 
			
		||||
    private Device jobEntity;
 | 
			
		||||
 | 
			
		||||
    @Before
 | 
			
		||||
    public void setUp() throws Exception {
 | 
			
		||||
        loginTenantAdmin();
 | 
			
		||||
        tenantId = super.tenantId;
 | 
			
		||||
        jobEntity = createDevice("Test", "Test");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @After
 | 
			
		||||
@ -84,15 +89,9 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testSubmitJob_allTasksSuccessful() {
 | 
			
		||||
        int tasksCount = 5;
 | 
			
		||||
        JobId jobId = jobManager.submitJob(Job.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("Test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .successfulTasksCount(tasksCount)
 | 
			
		||||
                        .taskProcessingTimeMs(1000)
 | 
			
		||||
                        .build())
 | 
			
		||||
        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                .successfulTasksCount(tasksCount)
 | 
			
		||||
                .taskProcessingTimeMs(1000)
 | 
			
		||||
                .build()).getId();
 | 
			
		||||
 | 
			
		||||
        await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
@ -108,29 +107,18 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
            assertThat(job.getResult().getResults()).isEmpty();
 | 
			
		||||
            assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        checkJobNotification(notification -> {
 | 
			
		||||
            assertThat(notification.getSubject()).isEqualTo("Dummy job task completed");
 | 
			
		||||
            assertThat(notification.getText()).isEqualTo("Test job completed: 5/5 successful, 0 failed");
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testSubmitJob_someTasksPermanentlyFailed() {
 | 
			
		||||
        int successfulTasks = 3;
 | 
			
		||||
        int failedTasks = 2;
 | 
			
		||||
        JobId jobId = jobManager.submitJob(Job.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("Test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .successfulTasksCount(successfulTasks)
 | 
			
		||||
                        .failedTasksCount(failedTasks)
 | 
			
		||||
                        .errors(List.of("error1", "error2", "error3"))
 | 
			
		||||
                        .retries(2)
 | 
			
		||||
                        .taskProcessingTimeMs(100)
 | 
			
		||||
                        .build())
 | 
			
		||||
        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                .successfulTasksCount(successfulTasks)
 | 
			
		||||
                .failedTasksCount(failedTasks)
 | 
			
		||||
                .errors(List.of("error1", "error2", "error3"))
 | 
			
		||||
                .retries(2)
 | 
			
		||||
                .taskProcessingTimeMs(100)
 | 
			
		||||
                .build()).getId();
 | 
			
		||||
 | 
			
		||||
        await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
@ -145,24 +133,13 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
            });
 | 
			
		||||
            assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount());
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        checkJobNotification(notification -> {
 | 
			
		||||
            assertThat(notification.getSubject()).isEqualTo("Dummy job task failed");
 | 
			
		||||
            assertThat(notification.getText()).isEqualTo("Test job failed: 3/5 successful, 2 failed");
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testSubmitJob_taskTimeout() {
 | 
			
		||||
        JobId jobId = jobManager.submitJob(Job.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("Test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .successfulTasksCount(1)
 | 
			
		||||
                        .taskProcessingTimeMs(5000) // bigger than DummyTaskProcessor.getTaskProcessingTimeout()
 | 
			
		||||
                        .build())
 | 
			
		||||
        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                .successfulTasksCount(1)
 | 
			
		||||
                .taskProcessingTimeMs(5000) // bigger than DummyTaskProcessor.getTaskProcessingTimeout()
 | 
			
		||||
                .build()).getId();
 | 
			
		||||
 | 
			
		||||
        await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
@ -177,15 +154,9 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testCancelJob_whileRunning() throws Exception {
 | 
			
		||||
        int tasksCount = 100;
 | 
			
		||||
        JobId jobId = jobManager.submitJob(Job.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .successfulTasksCount(tasksCount)
 | 
			
		||||
                        .taskProcessingTimeMs(100)
 | 
			
		||||
                        .build())
 | 
			
		||||
        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                .successfulTasksCount(tasksCount)
 | 
			
		||||
                .taskProcessingTimeMs(100)
 | 
			
		||||
                .build()).getId();
 | 
			
		||||
 | 
			
		||||
        Thread.sleep(500);
 | 
			
		||||
@ -203,15 +174,9 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testCancelJob_simulateTaskProcessorRestart() throws Exception {
 | 
			
		||||
        int tasksCount = 10;
 | 
			
		||||
        JobId jobId = jobManager.submitJob(Job.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .successfulTasksCount(tasksCount)
 | 
			
		||||
                        .taskProcessingTimeMs(500)
 | 
			
		||||
                        .build())
 | 
			
		||||
        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                .successfulTasksCount(tasksCount)
 | 
			
		||||
                .taskProcessingTimeMs(500)
 | 
			
		||||
                .build()).getId();
 | 
			
		||||
 | 
			
		||||
        // simulate cancelled jobs are forgotten
 | 
			
		||||
@ -239,16 +204,10 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
        loginSysAdmin();
 | 
			
		||||
        createDifferentTenant();
 | 
			
		||||
 | 
			
		||||
        TenantId tenantId = this.differentTenantId;
 | 
			
		||||
        jobManager.submitJob(Job.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .successfulTasksCount(1000)
 | 
			
		||||
                        .taskProcessingTimeMs(500)
 | 
			
		||||
                        .build())
 | 
			
		||||
        this.tenantId = this.differentTenantId;
 | 
			
		||||
        submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                .successfulTasksCount(1000)
 | 
			
		||||
                .taskProcessingTimeMs(500)
 | 
			
		||||
                .build());
 | 
			
		||||
 | 
			
		||||
        Thread.sleep(2000);
 | 
			
		||||
@ -261,25 +220,18 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testSubmitMultipleJobs() {
 | 
			
		||||
    public void testSubmitMultipleJobs() throws Exception {
 | 
			
		||||
        int tasksCount = 3;
 | 
			
		||||
        int jobsCount = 3;
 | 
			
		||||
        for (int i = 1; i <= jobsCount; i++) {
 | 
			
		||||
            Job job = Job.builder()
 | 
			
		||||
                    .tenantId(tenantId)
 | 
			
		||||
                    .type(JobType.DUMMY)
 | 
			
		||||
                    .key("test-job-" + i)
 | 
			
		||||
                    .description("test job")
 | 
			
		||||
                    .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                            .successfulTasksCount(tasksCount)
 | 
			
		||||
                            .taskProcessingTimeMs(1000)
 | 
			
		||||
                            .build())
 | 
			
		||||
                    .build();
 | 
			
		||||
            jobManager.submitJob(job);
 | 
			
		||||
            submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                    .successfulTasksCount(tasksCount)
 | 
			
		||||
                    .taskProcessingTimeMs(1000)
 | 
			
		||||
                    .build(), "test-job-" + i);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
            List<Job> jobs = findJobs(JobType.DUMMY);
 | 
			
		||||
            List<Job> jobs = findJobs(List.of(JobType.DUMMY), List.of(jobEntity.getUuidId()));
 | 
			
		||||
            assertThat(jobs).hasSize(jobsCount);
 | 
			
		||||
            Job firstJob = jobs.get(2); // ordered by createdTime descending
 | 
			
		||||
            assertThat(firstJob.getStatus()).isEqualTo(JobStatus.RUNNING);
 | 
			
		||||
@ -297,6 +249,11 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
                assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        doDelete("/api/device/" + jobEntity.getId()).andExpect(status().isOk());
 | 
			
		||||
        await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
            assertThat(findJobs(List.of(JobType.DUMMY), List.of(jobEntity.getUuidId()))).isEmpty();
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -305,17 +262,11 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
        int jobsCount = 3;
 | 
			
		||||
        List<JobId> jobIds = new ArrayList<>();
 | 
			
		||||
        for (int i = 1; i <= jobsCount; i++) {
 | 
			
		||||
            Job job = Job.builder()
 | 
			
		||||
                    .tenantId(tenantId)
 | 
			
		||||
                    .type(JobType.DUMMY)
 | 
			
		||||
                    .key("test-job-" + i)
 | 
			
		||||
                    .description("test job")
 | 
			
		||||
                    .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                            .successfulTasksCount(tasksCount)
 | 
			
		||||
                            .taskProcessingTimeMs(1000)
 | 
			
		||||
                            .build())
 | 
			
		||||
                    .build();
 | 
			
		||||
            jobIds.add(jobManager.submitJob(job).getId());
 | 
			
		||||
            Job job = submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                    .successfulTasksCount(tasksCount)
 | 
			
		||||
                    .taskProcessingTimeMs(1000)
 | 
			
		||||
                    .build(), "test-job-" + i);
 | 
			
		||||
            jobIds.add(job.getId());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for (int i = 1; i < jobIds.size(); i++) {
 | 
			
		||||
@ -343,16 +294,10 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testSubmitJob_generalError() {
 | 
			
		||||
        int submittedTasks = 100;
 | 
			
		||||
        JobId jobId = jobManager.submitJob(Job.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("Test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .generalError("Some error while submitting tasks")
 | 
			
		||||
                        .submittedTasksBeforeGeneralError(submittedTasks)
 | 
			
		||||
                        .taskProcessingTimeMs(10)
 | 
			
		||||
                        .build())
 | 
			
		||||
        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                .generalError("Some error while submitting tasks")
 | 
			
		||||
                .submittedTasksBeforeGeneralError(submittedTasks)
 | 
			
		||||
                .taskProcessingTimeMs(10)
 | 
			
		||||
                .build()).getId();
 | 
			
		||||
 | 
			
		||||
        await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
@ -362,24 +307,13 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
            assertThat(job.getResult().getDiscardedCount()).isZero();
 | 
			
		||||
            assertThat(job.getResult().getTotalCount()).isNull();
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        checkJobNotification(notification -> {
 | 
			
		||||
            assertThat(notification.getSubject()).isEqualTo("Dummy job task failed");
 | 
			
		||||
            assertThat(notification.getText()).isEqualTo("Test job failed: Some error while submitting tasks");
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testSubmitJob_immediateGeneralError() {
 | 
			
		||||
        JobId jobId = jobManager.submitJob(Job.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("Test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .generalError("Some error while submitting tasks")
 | 
			
		||||
                        .submittedTasksBeforeGeneralError(0)
 | 
			
		||||
                        .build())
 | 
			
		||||
        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                .generalError("Some error while submitting tasks")
 | 
			
		||||
                .submittedTasksBeforeGeneralError(0)
 | 
			
		||||
                .build()).getId();
 | 
			
		||||
 | 
			
		||||
        await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
@ -395,16 +329,10 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testReprocessJob_generalError() throws Exception {
 | 
			
		||||
        int submittedTasks = 100;
 | 
			
		||||
        JobId jobId = jobManager.submitJob(Job.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("Test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .generalError("Some error while submitting tasks")
 | 
			
		||||
                        .submittedTasksBeforeGeneralError(submittedTasks)
 | 
			
		||||
                        .taskProcessingTimeMs(10)
 | 
			
		||||
                        .build())
 | 
			
		||||
        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                .generalError("Some error while submitting tasks")
 | 
			
		||||
                .submittedTasksBeforeGeneralError(submittedTasks)
 | 
			
		||||
                .taskProcessingTimeMs(10)
 | 
			
		||||
                .build()).getId();
 | 
			
		||||
 | 
			
		||||
        await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
@ -437,17 +365,11 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
        int successfulTasks = 3;
 | 
			
		||||
        int failedTasks = 2;
 | 
			
		||||
        int totalTasksCount = successfulTasks + failedTasks;
 | 
			
		||||
        JobId jobId = jobManager.submitJob(Job.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .successfulTasksCount(successfulTasks)
 | 
			
		||||
                        .failedTasksCount(failedTasks)
 | 
			
		||||
                        .errors(List.of("error"))
 | 
			
		||||
                        .taskProcessingTimeMs(100)
 | 
			
		||||
                        .build())
 | 
			
		||||
        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                .successfulTasksCount(successfulTasks)
 | 
			
		||||
                .failedTasksCount(failedTasks)
 | 
			
		||||
                .errors(List.of("error"))
 | 
			
		||||
                .taskProcessingTimeMs(100)
 | 
			
		||||
                .build()).getId();
 | 
			
		||||
 | 
			
		||||
        await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
@ -484,18 +406,12 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
        int failedTasks = 2;
 | 
			
		||||
        int permanentlyFailedTasks = 1;
 | 
			
		||||
        int totalTasksCount = successfulTasks + failedTasks + permanentlyFailedTasks;
 | 
			
		||||
        JobId jobId = jobManager.submitJob(Job.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key("test-job")
 | 
			
		||||
                .description("test job")
 | 
			
		||||
                .configuration(DummyJobConfiguration.builder()
 | 
			
		||||
                        .successfulTasksCount(successfulTasks)
 | 
			
		||||
                        .failedTasksCount(failedTasks)
 | 
			
		||||
                        .permanentlyFailedTasksCount(permanentlyFailedTasks)
 | 
			
		||||
                        .errors(List.of("error"))
 | 
			
		||||
                        .taskProcessingTimeMs(100)
 | 
			
		||||
                        .build())
 | 
			
		||||
        JobId jobId = submitJob(DummyJobConfiguration.builder()
 | 
			
		||||
                .successfulTasksCount(successfulTasks)
 | 
			
		||||
                .failedTasksCount(failedTasks)
 | 
			
		||||
                .permanentlyFailedTasksCount(permanentlyFailedTasks)
 | 
			
		||||
                .errors(List.of("error"))
 | 
			
		||||
                .taskProcessingTimeMs(100)
 | 
			
		||||
                .build()).getId();
 | 
			
		||||
 | 
			
		||||
        await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
@ -534,14 +450,18 @@ public class JobManagerTest extends AbstractControllerTest {
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void checkJobNotification(ThrowingConsumer<Notification> assertFunction) {
 | 
			
		||||
        await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
            Notification notification = getMyNotifications(true, 100).stream()
 | 
			
		||||
                    .findFirst().orElse(null);
 | 
			
		||||
            assertThat(notification).isNotNull();
 | 
			
		||||
    private Job submitJob(DummyJobConfiguration configuration) {
 | 
			
		||||
        return submitJob(configuration, "test-job");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
            assertFunction.accept(notification);
 | 
			
		||||
        });
 | 
			
		||||
    private Job submitJob(DummyJobConfiguration configuration, String key) {
 | 
			
		||||
        return jobManager.submitJob(Job.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .type(JobType.DUMMY)
 | 
			
		||||
                .key(key)
 | 
			
		||||
                .entityId(jobEntity.getId())
 | 
			
		||||
                .configuration(configuration)
 | 
			
		||||
                .build());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<DummyTaskFailure> getFailures(JobResult jobResult) {
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,7 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.job;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.JobId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.job.Job;
 | 
			
		||||
@ -42,4 +43,6 @@ public interface JobService extends EntityDaoService {
 | 
			
		||||
 | 
			
		||||
    void deleteJob(TenantId tenantId, JobId jobId);
 | 
			
		||||
 | 
			
		||||
    int deleteJobsByEntityId(TenantId tenantId, EntityId entityId);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -85,6 +85,10 @@ public class HousekeeperTask implements Serializable {
 | 
			
		||||
        return new HousekeeperTask(tenantId, entityId, HousekeeperTaskType.DELETE_CALCULATED_FIELDS);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static HousekeeperTask deleteJobs(TenantId tenantId, EntityId entityId) {
 | 
			
		||||
        return new HousekeeperTask(tenantId, entityId, HousekeeperTaskType.DELETE_JOBS);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @JsonIgnore
 | 
			
		||||
    public String getDescription() {
 | 
			
		||||
        return taskType.getDescription() + " for " + entityId.getEntityType().getNormalName().toLowerCase() + " " + entityId.getId();
 | 
			
		||||
 | 
			
		||||
@ -31,7 +31,8 @@ public enum HousekeeperTaskType {
 | 
			
		||||
    UNASSIGN_ALARMS("alarms unassigning"),
 | 
			
		||||
    DELETE_TENANT_ENTITIES("tenant entities deletion"),
 | 
			
		||||
    DELETE_ENTITIES("entities deletion"),
 | 
			
		||||
    DELETE_CALCULATED_FIELDS("calculated fields deletion");
 | 
			
		||||
    DELETE_CALCULATED_FIELDS("calculated fields deletion"),
 | 
			
		||||
    DELETE_JOBS("jobs deletion");
 | 
			
		||||
 | 
			
		||||
    private final String description;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -17,14 +17,6 @@ package org.thingsboard.server.common.data.job;
 | 
			
		||||
 | 
			
		||||
public class DummyJobResult extends JobResult {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String getDescription() {
 | 
			
		||||
        if (getGeneralError() != null) {
 | 
			
		||||
            return getGeneralError();
 | 
			
		||||
        }
 | 
			
		||||
        return getSuccessfulCount() + "/" + getTotalCount() + " successful, " + getFailedCount() + " failed";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public JobType getJobType() {
 | 
			
		||||
        return JobType.DUMMY;
 | 
			
		||||
 | 
			
		||||
@ -24,10 +24,13 @@ import lombok.EqualsAndHashCode;
 | 
			
		||||
import lombok.NoArgsConstructor;
 | 
			
		||||
import lombok.ToString;
 | 
			
		||||
import org.thingsboard.server.common.data.BaseData;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.HasTenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.JobId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@ -42,8 +45,8 @@ public class Job extends BaseData<JobId> implements HasTenantId {
 | 
			
		||||
    private JobType type;
 | 
			
		||||
    @NotBlank
 | 
			
		||||
    private String key;
 | 
			
		||||
    @NotBlank
 | 
			
		||||
    private String description;
 | 
			
		||||
    @NotNull
 | 
			
		||||
    private EntityId entityId;
 | 
			
		||||
    @NotNull
 | 
			
		||||
    private JobStatus status;
 | 
			
		||||
    @NotNull
 | 
			
		||||
@ -52,12 +55,16 @@ public class Job extends BaseData<JobId> implements HasTenantId {
 | 
			
		||||
    @NotNull
 | 
			
		||||
    private JobResult result;
 | 
			
		||||
 | 
			
		||||
    public static final Set<EntityType> SUPPORTED_ENTITY_TYPES = Set.of(
 | 
			
		||||
            EntityType.DEVICE, EntityType.ASSET, EntityType.DEVICE_PROFILE, EntityType.ASSET_PROFILE
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    @Builder(toBuilder = true)
 | 
			
		||||
    public Job(TenantId tenantId, JobType type, String key, String description, JobConfiguration configuration) {
 | 
			
		||||
    public Job(TenantId tenantId, JobType type, String key, EntityId entityId, JobConfiguration configuration) {
 | 
			
		||||
        this.tenantId = tenantId;
 | 
			
		||||
        this.type = type;
 | 
			
		||||
        this.key = key;
 | 
			
		||||
        this.description = description;
 | 
			
		||||
        this.entityId = entityId;
 | 
			
		||||
        this.configuration = configuration;
 | 
			
		||||
        this.configuration.setTasksKey(UUID.randomUUID().toString());
 | 
			
		||||
        presetResult();
 | 
			
		||||
 | 
			
		||||
@ -19,6 +19,7 @@ import lombok.Builder;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@Builder
 | 
			
		||||
@ -26,5 +27,8 @@ public class JobFilter {
 | 
			
		||||
 | 
			
		||||
    private final List<JobType> types;
 | 
			
		||||
    private final List<JobStatus> statuses;
 | 
			
		||||
    private final List<UUID> entities;
 | 
			
		||||
    private final Long startTime;
 | 
			
		||||
    private final Long endTime;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -66,9 +66,6 @@ public abstract class JobResult implements Serializable {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @JsonIgnore
 | 
			
		||||
    public abstract String getDescription();
 | 
			
		||||
 | 
			
		||||
    @JsonIgnore
 | 
			
		||||
    public abstract JobType getJobType();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.User;
 | 
			
		||||
import org.thingsboard.server.common.data.housekeeper.HousekeeperTask;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.job.Job;
 | 
			
		||||
import org.thingsboard.server.common.msg.housekeeper.HousekeeperClient;
 | 
			
		||||
import org.thingsboard.server.dao.eventsourcing.ActionCause;
 | 
			
		||||
import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
 | 
			
		||||
@ -76,6 +77,9 @@ public class CleanUpService {
 | 
			
		||||
        submitTask(HousekeeperTask.deleteEvents(tenantId, entityId));
 | 
			
		||||
        submitTask(HousekeeperTask.deleteAlarms(tenantId, entityId));
 | 
			
		||||
        submitTask(HousekeeperTask.deleteCalculatedFields(tenantId, entityId));
 | 
			
		||||
        if (Job.SUPPORTED_ENTITY_TYPES.contains(entityId.getEntityType())) {
 | 
			
		||||
            submitTask(HousekeeperTask.deleteJobs(tenantId, entityId));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void removeTenantEntities(TenantId tenantId, EntityType... entityTypes) {
 | 
			
		||||
 | 
			
		||||
@ -157,6 +157,10 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
 | 
			
		||||
 | 
			
		||||
    private Job saveJob(TenantId tenantId, Job job, boolean publishEvent, JobStatus prevStatus) {
 | 
			
		||||
        ConstraintValidator.validateFields(job);
 | 
			
		||||
        if (!Job.SUPPORTED_ENTITY_TYPES.contains(job.getEntityId().getEntityType())) {
 | 
			
		||||
            throw new IllegalArgumentException("Unsupported entity type " + job.getEntityId().getEntityType());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        job = jobDao.save(tenantId, job);
 | 
			
		||||
        if (publishEvent) {
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder()
 | 
			
		||||
@ -203,6 +207,11 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
 | 
			
		||||
        jobDao.removeById(tenantId, jobId.getId());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public int deleteJobsByEntityId(TenantId tenantId, EntityId entityId) { // TODO: cancel all jobs for this entity
 | 
			
		||||
        return jobDao.removeByEntityId(tenantId, entityId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Job findForUpdate(TenantId tenantId, JobId jobId) {
 | 
			
		||||
        return jobDao.findByIdForUpdate(tenantId, jobId);
 | 
			
		||||
    }
 | 
			
		||||
@ -219,7 +228,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void deleteByTenantId(TenantId tenantId) {
 | 
			
		||||
        jobDao.deleteByTenantId(tenantId);
 | 
			
		||||
        jobDao.removeByTenantId(tenantId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,7 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.job;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.JobId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.job.Job;
 | 
			
		||||
@ -37,8 +38,12 @@ public interface JobDao extends Dao<Job> {
 | 
			
		||||
 | 
			
		||||
    boolean existsByTenantIdAndTypeAndStatusOneOf(TenantId tenantId, JobType type, JobStatus... statuses);
 | 
			
		||||
 | 
			
		||||
    boolean existsByTenantIdAndEntityIdAndStatusOneOf(TenantId tenantId, EntityId entityId, JobStatus... statuses);
 | 
			
		||||
 | 
			
		||||
    Job findOldestByTenantIdAndTypeAndStatusForUpdate(TenantId tenantId, JobType type, JobStatus status);
 | 
			
		||||
 | 
			
		||||
    void deleteByTenantId(TenantId tenantId);
 | 
			
		||||
    void removeByTenantId(TenantId tenantId);
 | 
			
		||||
 | 
			
		||||
    int removeByEntityId(TenantId tenantId, EntityId entityId);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -745,7 +745,8 @@ public class ModelConstants {
 | 
			
		||||
    public static final String JOB_TABLE_NAME = "job";
 | 
			
		||||
    public static final String JOB_TYPE_PROPERTY = "type";
 | 
			
		||||
    public static final String JOB_KEY_PROPERTY = "key";
 | 
			
		||||
    public static final String JOB_DESCRIPTION_PROPERTY = "description";
 | 
			
		||||
    public static final String JOB_ENTITY_ID_PROPERTY = "entity_id";
 | 
			
		||||
    public static final String JOB_ENTITY_TYPE_PROPERTY = "entity_type";
 | 
			
		||||
    public static final String JOB_STATUS_PROPERTY = "status";
 | 
			
		||||
    public static final String JOB_CONFIGURATION_PROPERTY = "configuration";
 | 
			
		||||
    public static final String JOB_RESULT_PROPERTY = "result";
 | 
			
		||||
 | 
			
		||||
@ -25,6 +25,8 @@ import jakarta.persistence.Table;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.EqualsAndHashCode;
 | 
			
		||||
import lombok.NoArgsConstructor;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.id.JobId;
 | 
			
		||||
import org.thingsboard.server.common.data.job.Job;
 | 
			
		||||
import org.thingsboard.server.common.data.job.JobConfiguration;
 | 
			
		||||
@ -54,8 +56,12 @@ public class JobEntity extends BaseSqlEntity<Job> {
 | 
			
		||||
    @Column(name = ModelConstants.JOB_KEY_PROPERTY, nullable = false)
 | 
			
		||||
    private String key;
 | 
			
		||||
 | 
			
		||||
    @Column(name = ModelConstants.JOB_DESCRIPTION_PROPERTY, nullable = false)
 | 
			
		||||
    private String description;
 | 
			
		||||
    @Column(name = ModelConstants.JOB_ENTITY_ID_PROPERTY, nullable = false)
 | 
			
		||||
    private UUID entityId;
 | 
			
		||||
 | 
			
		||||
    @Enumerated(EnumType.STRING)
 | 
			
		||||
    @Column(name = ModelConstants.JOB_ENTITY_TYPE_PROPERTY, nullable = false)
 | 
			
		||||
    private EntityType entityType;
 | 
			
		||||
 | 
			
		||||
    @Enumerated(EnumType.STRING)
 | 
			
		||||
    @Column(name = ModelConstants.JOB_STATUS_PROPERTY, nullable = false)
 | 
			
		||||
@ -74,7 +80,8 @@ public class JobEntity extends BaseSqlEntity<Job> {
 | 
			
		||||
        this.tenantId = getTenantUuid(job.getTenantId());
 | 
			
		||||
        this.type = job.getType();
 | 
			
		||||
        this.key = job.getKey();
 | 
			
		||||
        this.description = job.getDescription();
 | 
			
		||||
        this.entityId = job.getEntityId().getId();
 | 
			
		||||
        this.entityType = job.getEntityId().getEntityType();
 | 
			
		||||
        this.status = job.getStatus();
 | 
			
		||||
        this.configuration = toJson(job.getConfiguration());
 | 
			
		||||
        this.result = toJson(job.getResult());
 | 
			
		||||
@ -88,7 +95,7 @@ public class JobEntity extends BaseSqlEntity<Job> {
 | 
			
		||||
        job.setTenantId(getTenantId(tenantId));
 | 
			
		||||
        job.setType(type);
 | 
			
		||||
        job.setKey(key);
 | 
			
		||||
        job.setDescription(description);
 | 
			
		||||
        job.setEntityId(EntityIdFactory.getByTypeAndUuid(entityType, entityId));
 | 
			
		||||
        job.setStatus(status);
 | 
			
		||||
        job.setConfiguration(fromJson(configuration, JobConfiguration.class));
 | 
			
		||||
        job.setResult(fromJson(result, JobResult.class));
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,7 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.sql.job;
 | 
			
		||||
 | 
			
		||||
import org.springframework.data.domain.Limit;
 | 
			
		||||
import org.springframework.data.domain.Page;
 | 
			
		||||
import org.springframework.data.domain.Pageable;
 | 
			
		||||
import org.springframework.data.jpa.repository.JpaRepository;
 | 
			
		||||
@ -34,26 +35,34 @@ import java.util.UUID;
 | 
			
		||||
public interface JobRepository extends JpaRepository<JobEntity, UUID> {
 | 
			
		||||
 | 
			
		||||
    @Query("SELECT j FROM JobEntity j WHERE j.tenantId = :tenantId " +
 | 
			
		||||
           "AND (:types IS NULL OR j.type IN (:types)) AND (:statuses IS NULL OR j.status IN (:statuses)) " +
 | 
			
		||||
           "AND (:searchText IS NULL OR ilike(j.key, concat('%', :searchText, '%')) = true " +
 | 
			
		||||
           "OR ilike(j.description, concat('%', :searchText, '%')) = true)")
 | 
			
		||||
    Page<JobEntity> findByTenantIdAndTypesAndStatusesAndSearchText(@Param("tenantId") UUID tenantId,
 | 
			
		||||
                                                                   @Param("types") List<JobType> types,
 | 
			
		||||
                                                                   @Param("statuses") List<JobStatus> statuses,
 | 
			
		||||
                                                                   @Param("searchText") String searchText,
 | 
			
		||||
                                                                   Pageable pageable);
 | 
			
		||||
           "AND (:types IS NULL OR j.type IN (:types)) " +
 | 
			
		||||
           "AND (:statuses IS NULL OR j.status IN (:statuses)) " +
 | 
			
		||||
           "AND (:entities IS NULL OR j.entityId IN :entities) " +
 | 
			
		||||
           "AND (:startTime <= 0 OR j.createdTime >= :startTime) " +
 | 
			
		||||
           "AND (:endTime <= 0 OR j.createdTime <= :endTime) " +
 | 
			
		||||
           "AND (:searchText IS NULL OR ilike(j.key, concat('%', :searchText, '%')) = true)")
 | 
			
		||||
    Page<JobEntity> findByTenantIdAndTypesAndStatusesAndEntitiesAndTimeAndSearchText(@Param("tenantId") UUID tenantId,
 | 
			
		||||
                                                                                     @Param("types") List<JobType> types,
 | 
			
		||||
                                                                                     @Param("statuses") List<JobStatus> statuses,
 | 
			
		||||
                                                                                     @Param("entities") List<UUID> entities,
 | 
			
		||||
                                                                                     @Param("startTime") long startTime,
 | 
			
		||||
                                                                                     @Param("endTime") long endTime,
 | 
			
		||||
                                                                                     @Param("searchText") String searchText,
 | 
			
		||||
                                                                                     Pageable pageable);
 | 
			
		||||
 | 
			
		||||
    @Query(value = "SELECT * FROM job j WHERE j.id = :id FOR UPDATE", nativeQuery = true)
 | 
			
		||||
    JobEntity findByIdForUpdate(UUID id);
 | 
			
		||||
 | 
			
		||||
    @Query("SELECT j FROM JobEntity j WHERE j.tenantId = :tenantId AND j.key = :key " +
 | 
			
		||||
           "ORDER BY j.createdTime DESC")
 | 
			
		||||
    JobEntity findLatestByTenantIdAndKey(@Param("tenantId") UUID tenantId, @Param("key") String key);
 | 
			
		||||
    JobEntity findLatestByTenantIdAndKey(@Param("tenantId") UUID tenantId, @Param("key") String key, Limit limit);
 | 
			
		||||
 | 
			
		||||
    boolean existsByTenantIdAndKeyAndStatusIn(UUID tenantId, String key, List<JobStatus> statuses);
 | 
			
		||||
 | 
			
		||||
    boolean existsByTenantIdAndTypeAndStatusIn(UUID tenantId, JobType type, List<JobStatus> statuses);
 | 
			
		||||
 | 
			
		||||
    boolean existsByTenantIdAndEntityIdAndStatusIn(UUID tenantId, UUID entityId, List<JobStatus> statuses);
 | 
			
		||||
 | 
			
		||||
    @Query(value = "SELECT * FROM job j WHERE j.tenant_id = :tenantId AND j.type = :type " +
 | 
			
		||||
                   "AND j.status = :status ORDER BY j.created_time ASC, j.id ASC LIMIT 1 FOR UPDATE", nativeQuery = true)
 | 
			
		||||
    JobEntity findOldestByTenantIdAndTypeAndStatusForUpdate(UUID tenantId, String type, String status);
 | 
			
		||||
@ -63,4 +72,9 @@ public interface JobRepository extends JpaRepository<JobEntity, UUID> {
 | 
			
		||||
    @Query("DELETE FROM JobEntity j WHERE j.tenantId = :tenantId")
 | 
			
		||||
    void deleteByTenantId(UUID tenantId);
 | 
			
		||||
 | 
			
		||||
    @Transactional
 | 
			
		||||
    @Modifying
 | 
			
		||||
    @Query("DELETE FROM JobEntity j WHERE j.entityId = :entityId")
 | 
			
		||||
    int deleteByEntityId(UUID entityId);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -17,9 +17,11 @@ package org.thingsboard.server.dao.sql.job;
 | 
			
		||||
 | 
			
		||||
import com.google.common.base.Strings;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.springframework.data.domain.Limit;
 | 
			
		||||
import org.springframework.data.jpa.repository.JpaRepository;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.JobId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.job.Job;
 | 
			
		||||
@ -47,9 +49,12 @@ public class JpaJobDao extends JpaAbstractDao<JobEntity, Job> implements JobDao
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public PageData<Job> findByTenantIdAndFilter(TenantId tenantId, JobFilter filter, PageLink pageLink) {
 | 
			
		||||
        return DaoUtil.toPageData(jobRepository.findByTenantIdAndTypesAndStatusesAndSearchText(tenantId.getId(),
 | 
			
		||||
        return DaoUtil.toPageData(jobRepository.findByTenantIdAndTypesAndStatusesAndEntitiesAndTimeAndSearchText(tenantId.getId(),
 | 
			
		||||
                CollectionsUtil.isEmpty(filter.getTypes()) ? null : filter.getTypes(),
 | 
			
		||||
                CollectionsUtil.isEmpty(filter.getStatuses()) ? null : filter.getStatuses(),
 | 
			
		||||
                CollectionsUtil.isEmpty(filter.getEntities()) ? null : filter.getEntities(),
 | 
			
		||||
                filter.getStartTime() != null ? filter.getStartTime() : 0,
 | 
			
		||||
                filter.getEndTime() != null ? filter.getEndTime() : 0,
 | 
			
		||||
                Strings.emptyToNull(pageLink.getTextSearch()), DaoUtil.toPageable(pageLink)));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -60,7 +65,7 @@ public class JpaJobDao extends JpaAbstractDao<JobEntity, Job> implements JobDao
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Job findLatestByTenantIdAndKey(TenantId tenantId, String key) {
 | 
			
		||||
        return DaoUtil.getData(jobRepository.findLatestByTenantIdAndKey(tenantId.getId(), key));
 | 
			
		||||
        return DaoUtil.getData(jobRepository.findLatestByTenantIdAndKey(tenantId.getId(), key, Limit.of(1)));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -73,16 +78,26 @@ public class JpaJobDao extends JpaAbstractDao<JobEntity, Job> implements JobDao
 | 
			
		||||
        return jobRepository.existsByTenantIdAndTypeAndStatusIn(tenantId.getId(), type, Arrays.stream(statuses).toList());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean existsByTenantIdAndEntityIdAndStatusOneOf(TenantId tenantId, EntityId entityId, JobStatus... statuses) {
 | 
			
		||||
        return jobRepository.existsByTenantIdAndEntityIdAndStatusIn(tenantId.getId(), entityId.getId(), Arrays.stream(statuses).toList());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Job findOldestByTenantIdAndTypeAndStatusForUpdate(TenantId tenantId, JobType type, JobStatus status) {
 | 
			
		||||
        return DaoUtil.getData(jobRepository.findOldestByTenantIdAndTypeAndStatusForUpdate(tenantId.getId(), type.name(), status.name()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void deleteByTenantId(TenantId tenantId) {
 | 
			
		||||
    public void removeByTenantId(TenantId tenantId) {
 | 
			
		||||
        jobRepository.deleteByTenantId(tenantId.getId());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public int removeByEntityId(TenantId tenantId, EntityId entityId) {
 | 
			
		||||
        return jobRepository.deleteByEntityId(entityId.getId());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public EntityType getEntityType() {
 | 
			
		||||
        return EntityType.JOB;
 | 
			
		||||
 | 
			
		||||
@ -955,7 +955,8 @@ CREATE TABLE IF NOT EXISTS job (
 | 
			
		||||
    tenant_id uuid NOT NULL,
 | 
			
		||||
    type varchar NOT NULL,
 | 
			
		||||
    key varchar NOT NULL,
 | 
			
		||||
    description varchar NOT NULL,
 | 
			
		||||
    entity_id uuid NOT NULL,
 | 
			
		||||
    entity_type varchar NOT NULL,
 | 
			
		||||
    status varchar NOT NULL,
 | 
			
		||||
    configuration varchar NOT NULL,
 | 
			
		||||
    result varchar
 | 
			
		||||
 | 
			
		||||
@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.job.Job;
 | 
			
		||||
 | 
			
		||||
public interface JobManager {
 | 
			
		||||
 | 
			
		||||
    Job submitJob(Job job);
 | 
			
		||||
    Job submitJob(Job job); // TODO: rate limits
 | 
			
		||||
 | 
			
		||||
    void cancelJob(TenantId tenantId, JobId jobId);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user