diff --git a/application/src/main/java/org/thingsboard/server/controller/JobController.java b/application/src/main/java/org/thingsboard/server/controller/JobController.java index 1db84593f5..b301c2e321 100644 --- a/application/src/main/java/org/thingsboard/server/controller/JobController.java +++ b/application/src/main/java/org/thingsboard/server/controller/JobController.java @@ -19,6 +19,7 @@ import io.swagger.v3.oas.annotations.Parameter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; @@ -56,14 +57,14 @@ public class JobController extends BaseController { private final JobManager jobManager; @GetMapping("/job/{id}") - @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')") + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") public Job getJobById(@PathVariable UUID id) throws ThingsboardException { // todo check permissions return jobService.findJobById(getTenantId(), new JobId(id)); } @GetMapping("/jobs") - @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')") + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") public PageData getJobs(@Parameter(description = PAGE_SIZE_DESCRIPTION, required = true) @RequestParam int pageSize, @Parameter(description = PAGE_NUMBER_DESCRIPTION, required = true) @@ -86,17 +87,24 @@ public class JobController extends BaseController { } @PostMapping("/job/{id}/cancel") - @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')") + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") public void cancelJob(@PathVariable UUID id) throws ThingsboardException { // todo check permissions jobManager.cancelJob(getTenantId(), new JobId(id)); } @PostMapping("/job/{id}/reprocess") - @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')") + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") public void reprocessJob(@PathVariable UUID id) throws ThingsboardException { // todo check permissions jobManager.reprocessJob(getTenantId(), new JobId(id)); } + @DeleteMapping("/job/{id}") + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") + public void deleteJob(@PathVariable UUID id) throws ThingsboardException { + // todo check permissions + jobService.deleteJob(getTenantId(), new JobId(id)); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java index bcb77e7005..83d7d60144 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java @@ -41,7 +41,6 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; -import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.notification.NotificationRequest; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; @@ -305,10 +304,24 @@ public class EntityStateSourcingListener { private void onJobUpdate(Job job) { jobManager.ifPresent(jobManager -> jobManager.onJobUpdate(job)); - if (job.getResult().getCancellationTs() > 0 || (job.getStatus().isOneOf(JobStatus.FAILED) && job.getResult().getGeneralError() != null)) { - // task processors will add this job to the list of discarded - tbClusterService.broadcastEntityStateChangeEvent(job.getTenantId(), job.getId(), ComponentLifecycleEvent.STOPPED); + + ComponentLifecycleEvent event; + if (job.getResult().getCancellationTs() > 0) { + event = ComponentLifecycleEvent.STOPPED; + } else if (job.getResult().getGeneralError() != null) { + event = ComponentLifecycleEvent.FAILED; + } else { + return; } + ComponentLifecycleMsg msg = ComponentLifecycleMsg.builder() + .tenantId(job.getTenantId()) + .entityId(job.getId()) + .event(event) + .info(JacksonUtil.newObjectNode() + .put("tasksKey", job.getConfiguration().getTasksKey())) + .build(); + // task processors will add this job to the list of discarded + tbClusterService.broadcast(msg); } private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) { diff --git a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java index 580dd0ff99..ac237d7310 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.job; import jakarta.annotation.PreDestroy; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; @@ -178,26 +179,29 @@ public class DefaultJobManager implements JobManager { JobResult result = job.getResult(); if (result.getGeneralError() != null) { - throw new IllegalArgumentException("Reprocessing not allowed since job has general error"); + job.presetResult(); + } else { + List taskFailures = result.getResults().stream() + .filter(taskResult -> !taskResult.isSuccess() && !taskResult.isDiscarded()) + .toList(); + if (result.getFailedCount() > taskFailures.size()) { + throw new IllegalArgumentException("Reprocessing not allowed since there are too many failures (more than " + taskFailures.size() + ")"); + } + result.setFailedCount(0); + result.setResults(result.getResults().stream() + .filter(TaskResult::isSuccess) + .toList()); + job.getConfiguration().setToReprocess(taskFailures); } - List taskFailures = result.getResults().stream() - .filter(taskResult -> !taskResult.isSuccess() && !taskResult.isDiscarded()) - .toList(); - if (result.getFailedCount() > taskFailures.size()) { - throw new IllegalArgumentException("Reprocessing not allowed since there are too many failures (more than " + taskFailures.size() + ")"); - } - - result.setFailedCount(0); - result.setResults(result.getResults().stream() - .filter(TaskResult::isSuccess) - .toList()); - - job.getConfiguration().setToReprocess(taskFailures); - + job.getConfiguration().setTasksKey(UUID.randomUUID().toString()); jobService.saveJob(tenantId, job); } private void submitTask(Task task) { + if (ObjectUtils.anyNull(task.getTenantId(), task.getJobId(), task.getKey())) { + throw new IllegalArgumentException("Task " + task + " missing required fields"); + } + log.debug("[{}][{}] Submitting task: {}", task.getTenantId(), task.getJobId(), task); TaskProto taskProto = TaskProto.newBuilder() .setValue(JacksonUtil.toString(task)) diff --git a/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java index e1b91e606a..373bc4afee 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java @@ -76,6 +76,7 @@ public class DummyJobProcessor implements JobProcessor { return DummyTask.builder() .tenantId(job.getTenantId()) .jobId(job.getId()) + .key(configuration.getTasksKey()) .retries(configuration.getRetries()) .number(number) .processingTimeMs(configuration.getTaskProcessingTimeMs()) diff --git a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java index 1bcf6b36b3..5c88083146 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java @@ -36,7 +36,7 @@ public class DummyTaskProcessor extends TaskProcessor> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer(); Set tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE); diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java index a6e40333eb..3ae3bf5686 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java @@ -36,7 +36,7 @@ import org.thingsboard.server.common.data.job.task.DummyTaskResult.DummyTaskFail import org.thingsboard.server.common.data.notification.Notification; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.controller.AbstractControllerTest; -import org.thingsboard.server.dao.job.JobService; +import org.thingsboard.server.dao.job.JobDao; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.queue.task.JobStatsService; @@ -69,7 +69,7 @@ public class JobManagerTest extends AbstractControllerTest { private JobStatsService jobStatsService; @Autowired - private JobService jobService; + private JobDao jobDao; @Before public void setUp() throws Exception { @@ -220,7 +220,7 @@ public class JobManagerTest extends AbstractControllerTest { inv.callRealMethod(); } return null; - }).when(taskProcessor).addToDiscardedJobs(any()); // ignoring cancellation event, + }).when(taskProcessor).addToDiscarded(any()); // ignoring cancellation event, cancelJob(jobId); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { @@ -256,7 +256,7 @@ public class JobManagerTest extends AbstractControllerTest { Thread.sleep(3000); verify(jobStatsService, never()).reportTaskResult(any(), any(), any()); - assertThat(jobService.findJobsByFilter(tenantId, JobFilter.builder().build(), new PageLink(100)).getData()).isEmpty(); + assertThat(jobDao.findByTenantIdAndFilter(tenantId, JobFilter.builder().build(), new PageLink(100)).getData()).isEmpty(); } @Test @@ -340,7 +340,7 @@ public class JobManagerTest extends AbstractControllerTest { } @Test - public void testGeneralJobError() { + public void testSubmitJob_generalError() { int submittedTasks = 100; JobId jobId = jobManager.submitJob(Job.builder() .tenantId(tenantId) @@ -358,7 +358,7 @@ public class JobManagerTest extends AbstractControllerTest { Job job = findJobById(jobId); assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); assertThat(job.getResult().getSuccessfulCount()).isBetween(1, submittedTasks); - assertThat(job.getResult().getDiscardedCount()).isBetween(1, submittedTasks); + assertThat(job.getResult().getDiscardedCount()).isZero(); assertThat(job.getResult().getTotalCount()).isNull(); }); @@ -369,7 +369,70 @@ public class JobManagerTest extends AbstractControllerTest { } @Test - public void testJobReprocessing() throws Exception { + public void testSubmitJob_immediateGeneralError() { + JobId jobId = jobManager.submitJob(Job.builder() + .tenantId(tenantId) + .type(JobType.DUMMY) + .key("test-job") + .description("Test job") + .configuration(DummyJobConfiguration.builder() + .generalError("Some error while submitting tasks") + .submittedTasksBeforeGeneralError(0) + .build()) + .build()).getId(); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + Job job = findJobById(jobId); + assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); + assertThat(job.getResult().getSuccessfulCount()).isZero(); + assertThat(job.getResult().getDiscardedCount()).isZero(); + assertThat(job.getResult().getFailedCount()).isZero(); + assertThat(job.getResult().getTotalCount()).isNull(); + }); + } + + @Test + public void testReprocessJob_generalError() throws Exception { + int submittedTasks = 100; + JobId jobId = jobManager.submitJob(Job.builder() + .tenantId(tenantId) + .type(JobType.DUMMY) + .key("test-job") + .description("Test job") + .configuration(DummyJobConfiguration.builder() + .generalError("Some error while submitting tasks") + .submittedTasksBeforeGeneralError(submittedTasks) + .taskProcessingTimeMs(10) + .build()) + .build()).getId(); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + Job job = findJobById(jobId); + assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); + assertThat(job.getResult().getGeneralError()).isEqualTo("Some error while submitting tasks"); + }); + + Job savedJob = jobDao.findById(tenantId, jobId.getId()); + DummyJobConfiguration configuration = savedJob.getConfiguration(); + configuration.setGeneralError(null); + configuration.setSuccessfulTasksCount(submittedTasks); + jobDao.save(tenantId, savedJob); + + reprocessJob(jobId); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + Job job = findJobById(jobId); + assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED); + assertThat(job.getResult().getGeneralError()).isNull(); + assertThat(job.getResult().getSuccessfulCount()).isEqualTo(submittedTasks); + assertThat(job.getResult().getTotalCount()).isEqualTo(submittedTasks); + assertThat(job.getResult().getFailedCount()).isZero(); + assertThat(job.getResult().getDiscardedCount()).isZero(); + }); + } + + @Test + public void testReprocessJob() throws Exception { int successfulTasks = 3; int failedTasks = 2; int totalTasksCount = successfulTasks + failedTasks; @@ -410,11 +473,12 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(job.getResult().getFailedCount()).isZero(); assertThat(job.getResult().getTotalCount()).isEqualTo(totalTasksCount); assertThat(job.getResult().getResults()).isEmpty(); + assertThat(job.getConfiguration().getToReprocess()).isNullOrEmpty(); }); } @Test - public void testJobReprocessing_somePermanentlyFailed() throws Exception { + public void testReprocessJob_somePermanentlyFailed() throws Exception { int successfulTasks = 3; int failedTasks = 2; int permanentlyFailedTasks = 1; diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest_EntityPartitioningStrategy.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest_EntityPartitioningStrategy.java index a021603ca6..59093ad802 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest_EntityPartitioningStrategy.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest_EntityPartitioningStrategy.java @@ -36,7 +36,7 @@ public class JobManagerTest_EntityPartitioningStrategy extends JobManagerTest { } @Override - public void testGeneralJobError() { + public void testSubmitJob_generalError() { } diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index aed6eb4cf5..6da6fcf8a8 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -87,6 +87,8 @@ public interface TbClusterService extends TbQueueClusterService { void broadcastEntityStateChangeEvent(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state); + void broadcast(ComponentLifecycleMsg componentLifecycleMsg); + void onDeviceProfileChange(DeviceProfile deviceProfile, DeviceProfile oldDeviceProfile, TbQueueCallback callback); void onDeviceProfileDelete(DeviceProfile deviceProfile, TbQueueCallback callback); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java index 33f9511267..7d4c68f6a4 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java @@ -40,4 +40,6 @@ public interface JobService extends EntityDaoService { Job findLatestJobByKey(TenantId tenantId, String key); + void deleteJob(TenantId tenantId, JobId jobId); + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java index 62695eb237..a9ff9e7f4f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java @@ -20,6 +20,7 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.ToString; import java.util.List; @@ -28,6 +29,7 @@ import java.util.List; @AllArgsConstructor @NoArgsConstructor @Builder +@ToString(callSuper = true) public class DummyJobConfiguration extends JobConfiguration { private long taskProcessingTimeMs; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java index d4e69761c8..f914067be3 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java @@ -28,6 +28,8 @@ import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; +import java.util.UUID; + @Data @NoArgsConstructor @ToString(callSuper = true) @@ -42,19 +44,26 @@ public class Job extends BaseData implements HasTenantId { private String key; @NotBlank private String description; + @NotNull private JobStatus status; @NotNull @Valid private JobConfiguration configuration; + @NotNull private JobResult result; - @Builder + @Builder(toBuilder = true) public Job(TenantId tenantId, JobType type, String key, String description, JobConfiguration configuration) { this.tenantId = tenantId; this.type = type; this.key = key; this.description = description; this.configuration = configuration; + this.configuration.setTasksKey(UUID.randomUUID().toString()); + presetResult(); + } + + public void presetResult() { this.result = switch (type) { case DUMMY -> new DummyJobResult(); }; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java index 35f44cd4be..54d8166a5d 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import jakarta.validation.constraints.NotBlank; import lombok.Data; import org.thingsboard.server.common.data.job.task.TaskResult; @@ -34,7 +35,9 @@ import java.util.List; @Data public abstract class JobConfiguration implements Serializable { - private List toReprocess; + @NotBlank + private String tasksKey; // internal + private List toReprocess; // internal @JsonIgnore public abstract JobType getType(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTask.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTask.java index 7e262ed7b8..e0f670ad9e 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTask.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTask.java @@ -46,7 +46,7 @@ public class DummyTask extends Task { @Override public DummyTaskResult toDiscarded() { - return DummyTaskResult.discarded(); + return DummyTaskResult.discarded(this); } @Override diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java index cd68b4d248..1988f13eb0 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java @@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.job.task; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.ToString; import lombok.experimental.SuperBuilder; import org.thingsboard.server.common.data.job.JobType; @@ -25,29 +26,34 @@ import org.thingsboard.server.common.data.job.JobType; @EqualsAndHashCode(callSuper = true) @NoArgsConstructor @SuperBuilder +@ToString(callSuper = true) public class DummyTaskResult extends TaskResult { - private static final DummyTaskResult SUCCESS = DummyTaskResult.builder().success(true).build(); - private static final DummyTaskResult DISCARDED = DummyTaskResult.builder().discarded(true).build(); - private DummyTaskFailure failure; - public static DummyTaskResult success() { - return SUCCESS; + public static DummyTaskResult success(DummyTask task) { + return DummyTaskResult.builder() + .key(task.getKey()) + .success(true) + .build(); } public static DummyTaskResult failed(DummyTask task, Throwable error) { - DummyTaskResult result = new DummyTaskResult(); - result.setFailure(DummyTaskFailure.builder() - .error(error.getMessage()) - .number(task.getNumber()) - .failAlways(task.isFailAlways()) - .build()); - return result; + return DummyTaskResult.builder() + .key(task.getKey()) + .failure(DummyTaskFailure.builder() + .error(error.getMessage()) + .number(task.getNumber()) + .failAlways(task.isFailAlways()) + .build()) + .build(); } - public static DummyTaskResult discarded() { - return DISCARDED; + public static DummyTaskResult discarded(DummyTask task) { + return DummyTaskResult.builder() + .key(task.getKey()) + .discarded(true) + .build(); } @Override diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/Task.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/Task.java index cce32fdad0..fb373d9850 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/Task.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/Task.java @@ -40,6 +40,7 @@ public abstract class Task { private TenantId tenantId; private JobId jobId; + private String key; private int retries; public Task() { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java index 9416cb8f6b..21303a55fe 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java @@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.job.JobType; }) public abstract class TaskResult { + private String key; private boolean success; private boolean discarded; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java index 13fd6159fc..d57301fd10 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.msg.plugin; +import com.fasterxml.jackson.databind.JsonNode; import lombok.Builder; import lombok.Data; import org.thingsboard.server.common.data.EntityType; @@ -45,13 +46,14 @@ public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg { private final String name; private final EntityId oldProfileId; private final EntityId profileId; + private final JsonNode info; public ComponentLifecycleMsg(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent event) { - this(tenantId, entityId, event, null, null, null, null); + this(tenantId, entityId, event, null, null, null, null, null); } @Builder - private ComponentLifecycleMsg(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent event, String oldName, String name, EntityId oldProfileId, EntityId profileId) { + private ComponentLifecycleMsg(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent event, String oldName, String name, EntityId oldProfileId, EntityId profileId, JsonNode info) { this.tenantId = tenantId; this.entityId = entityId; this.event = event; @@ -59,6 +61,7 @@ public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg { this.name = name; this.oldProfileId = oldProfileId; this.profileId = profileId; + this.info = info; } public Optional getRuleChainId() { diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index 1ebd753f3c..9781cf9a7e 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -139,6 +139,9 @@ public class ProtoUtils { if (msg.getOldName() != null) { builder.setOldName(msg.getOldName()); } + if (msg.getInfo() != null) { + builder.setInfo(JacksonUtil.toString(msg.getInfo())); + } return builder.build(); } @@ -166,6 +169,9 @@ public class ProtoUtils { var profileType = EntityType.DEVICE.equals(entityId.getEntityType()) ? EntityType.DEVICE_PROFILE : EntityType.ASSET_PROFILE; builder.oldProfileId(EntityIdFactory.getByTypeAndUuid(profileType, new UUID(proto.getOldProfileIdMSB(), proto.getOldProfileIdLSB()))); } + if (proto.hasInfo()) { + builder.info(JacksonUtil.toJsonNode(proto.getInfo())); + } return builder.build(); } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 2a7d28241e..d03f59143c 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -1261,6 +1261,7 @@ message ComponentLifecycleMsgProto { int64 oldProfileIdLSB = 10; int64 profileIdMSB = 11; int64 profileIdLSB = 12; + optional string info = 13; } message EdgeEventMsgProto { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index 947d252442..ef0e48e30a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -68,7 +68,9 @@ public abstract class TaskProcessor, R extends TaskResult> { private MainQueueConsumerManager, QueueConfig> taskConsumer; private final ExecutorService taskExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getJobType().name().toLowerCase() + "-task-processor")); - private final SetCache discardedJobs = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); + private final SetCache discarded = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); + private final SetCache failed = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); + private final SetCache deletedTenants = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); @PostConstruct @@ -98,9 +100,13 @@ public abstract class TaskProcessor, R extends TaskResult> { EntityId entityId = event.getEntityId(); switch (entityId.getEntityType()) { case JOB -> { + String tasksKey = event.getInfo().get("tasksKey").asText(); if (event.getEvent() == ComponentLifecycleEvent.STOPPED) { - log.info("Adding job {} to discarded", entityId); - addToDiscardedJobs(entityId.getId()); + log.info("Adding job {} ({}) to discarded", entityId, tasksKey); + addToDiscarded(tasksKey); + } else if (event.getEvent() == ComponentLifecycleEvent.FAILED) { + log.info("Adding job {} ({}) to failed", entityId, tasksKey); + failed.add(tasksKey); } } case TENANT -> { @@ -117,14 +123,18 @@ public abstract class TaskProcessor, R extends TaskResult> { try { @SuppressWarnings("unchecked") T task = (T) JacksonUtil.fromString(msg.getValue().getValue(), Task.class); - if (discardedJobs.contains(task.getJobId().getId())) { - log.debug("Skipping task for cancelled job {}: {}", task.getJobId(), task); + if (discarded.contains(task.getKey())) { + log.debug("Skipping task for discarded job {}: {}", task.getJobId(), task); reportTaskDiscarded(task); continue; + } else if (failed.contains(task.getKey())) { + log.debug("Skipping task for failed job {}: {}", task.getJobId(), task); + continue; } else if (deletedTenants.contains(task.getTenantId().getId())) { log.debug("Skipping task for deleted tenant {}: {}", task.getTenantId(), task); continue; } + processTask(task); } catch (InterruptedException e) { throw e; @@ -185,8 +195,8 @@ public abstract class TaskProcessor, R extends TaskResult> { statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result); } - public void addToDiscardedJobs(UUID jobId) { - discardedJobs.add(jobId); + public void addToDiscarded(String tasksKey) { + discarded.add(tasksKey); } protected V wait(Future future) throws Exception { diff --git a/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java b/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java index 5903c10ac2..d153501b92 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java +++ b/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java @@ -37,9 +37,10 @@ import com.google.common.collect.Lists; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.Contract; +import org.thingsboard.server.common.data.Views; import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.common.data.kv.KvEntry; -import org.thingsboard.server.common.data.Views; import java.io.File; import java.io.IOException; @@ -109,6 +110,7 @@ public class JacksonUtil { } } + @Contract("null, _ -> null") // so that IDE doesn't show NPE warning when input is not null public static T fromString(String string, Class clazz) { try { return string != null ? OBJECT_MAPPER.readValue(string, clazz) : null; diff --git a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java index 50b3e4bde8..cd001b61b4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.AbstractEntityService; import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; +import org.thingsboard.server.dao.service.ConstraintValidator; import java.util.Optional; @@ -119,6 +120,11 @@ public class DefaultJobService extends AbstractEntityService implements JobServi boolean publishEvent = false; for (TaskResult taskResult : jobStats.getTaskResults()) { + if (!taskResult.getKey().equals(job.getConfiguration().getTasksKey())) { + log.debug("Ignoring task result {} with outdated key {}", taskResult, job.getConfiguration().getTasksKey()); + continue; + } + result.processTaskResult(taskResult); if (result.getCancellationTs() > 0) { @@ -142,6 +148,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi publishEvent = true; } result.setFinishTs(System.currentTimeMillis()); + job.getConfiguration().setToReprocess(null); } } @@ -149,6 +156,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi } private Job saveJob(TenantId tenantId, Job job, boolean publishEvent, JobStatus prevStatus) { + ConstraintValidator.validateFields(job); job = jobDao.save(tenantId, job); if (publishEvent) { eventPublisher.publishEvent(SaveEntityEvent.builder() @@ -186,6 +194,15 @@ public class DefaultJobService extends AbstractEntityService implements JobServi return jobDao.findLatestByTenantIdAndKey(tenantId, key); } + @Override + public void deleteJob(TenantId tenantId, JobId jobId) { + Job job = findJobById(tenantId, jobId); + if (!job.getStatus().isOneOf(CANCELLED, COMPLETED, FAILED)) { + throw new IllegalArgumentException("Job must be cancelled, completed or failed"); + } + jobDao.removeById(tenantId, jobId.getId()); + } + private Job findForUpdate(TenantId tenantId, JobId jobId) { return jobDao.findByIdForUpdate(tenantId, jobId); }