Support reprocessing for job with general error; job deletion; refactoring
This commit is contained in:
parent
8dda445253
commit
0dde966082
@ -19,6 +19,7 @@ import io.swagger.v3.oas.annotations.Parameter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.security.access.prepost.PreAuthorize;
|
||||
import org.springframework.web.bind.annotation.DeleteMapping;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
@ -56,14 +57,14 @@ public class JobController extends BaseController {
|
||||
private final JobManager jobManager;
|
||||
|
||||
@GetMapping("/job/{id}")
|
||||
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')")
|
||||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
|
||||
public Job getJobById(@PathVariable UUID id) throws ThingsboardException {
|
||||
// todo check permissions
|
||||
return jobService.findJobById(getTenantId(), new JobId(id));
|
||||
}
|
||||
|
||||
@GetMapping("/jobs")
|
||||
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')")
|
||||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
|
||||
public PageData<Job> getJobs(@Parameter(description = PAGE_SIZE_DESCRIPTION, required = true)
|
||||
@RequestParam int pageSize,
|
||||
@Parameter(description = PAGE_NUMBER_DESCRIPTION, required = true)
|
||||
@ -86,17 +87,24 @@ public class JobController extends BaseController {
|
||||
}
|
||||
|
||||
@PostMapping("/job/{id}/cancel")
|
||||
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')")
|
||||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
|
||||
public void cancelJob(@PathVariable UUID id) throws ThingsboardException {
|
||||
// todo check permissions
|
||||
jobManager.cancelJob(getTenantId(), new JobId(id));
|
||||
}
|
||||
|
||||
@PostMapping("/job/{id}/reprocess")
|
||||
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')")
|
||||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
|
||||
public void reprocessJob(@PathVariable UUID id) throws ThingsboardException {
|
||||
// todo check permissions
|
||||
jobManager.reprocessJob(getTenantId(), new JobId(id));
|
||||
}
|
||||
|
||||
@DeleteMapping("/job/{id}")
|
||||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
|
||||
public void deleteJob(@PathVariable UUID id) throws ThingsboardException {
|
||||
// todo check permissions
|
||||
jobService.deleteJob(getTenantId(), new JobId(id));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -41,7 +41,6 @@ import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.job.Job;
|
||||
import org.thingsboard.server.common.data.job.JobStatus;
|
||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||
import org.thingsboard.server.common.data.notification.NotificationRequest;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
@ -305,10 +304,24 @@ public class EntityStateSourcingListener {
|
||||
|
||||
private void onJobUpdate(Job job) {
|
||||
jobManager.ifPresent(jobManager -> jobManager.onJobUpdate(job));
|
||||
if (job.getResult().getCancellationTs() > 0 || (job.getStatus().isOneOf(JobStatus.FAILED) && job.getResult().getGeneralError() != null)) {
|
||||
// task processors will add this job to the list of discarded
|
||||
tbClusterService.broadcastEntityStateChangeEvent(job.getTenantId(), job.getId(), ComponentLifecycleEvent.STOPPED);
|
||||
|
||||
ComponentLifecycleEvent event;
|
||||
if (job.getResult().getCancellationTs() > 0) {
|
||||
event = ComponentLifecycleEvent.STOPPED;
|
||||
} else if (job.getResult().getGeneralError() != null) {
|
||||
event = ComponentLifecycleEvent.FAILED;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
ComponentLifecycleMsg msg = ComponentLifecycleMsg.builder()
|
||||
.tenantId(job.getTenantId())
|
||||
.entityId(job.getId())
|
||||
.event(event)
|
||||
.info(JacksonUtil.newObjectNode()
|
||||
.put("tasksKey", job.getConfiguration().getTasksKey()))
|
||||
.build();
|
||||
// task processors will add this job to the list of discarded
|
||||
tbClusterService.broadcast(msg);
|
||||
}
|
||||
|
||||
private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) {
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.service.job;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
@ -178,26 +179,29 @@ public class DefaultJobManager implements JobManager {
|
||||
|
||||
JobResult result = job.getResult();
|
||||
if (result.getGeneralError() != null) {
|
||||
throw new IllegalArgumentException("Reprocessing not allowed since job has general error");
|
||||
job.presetResult();
|
||||
} else {
|
||||
List<TaskResult> taskFailures = result.getResults().stream()
|
||||
.filter(taskResult -> !taskResult.isSuccess() && !taskResult.isDiscarded())
|
||||
.toList();
|
||||
if (result.getFailedCount() > taskFailures.size()) {
|
||||
throw new IllegalArgumentException("Reprocessing not allowed since there are too many failures (more than " + taskFailures.size() + ")");
|
||||
}
|
||||
result.setFailedCount(0);
|
||||
result.setResults(result.getResults().stream()
|
||||
.filter(TaskResult::isSuccess)
|
||||
.toList());
|
||||
job.getConfiguration().setToReprocess(taskFailures);
|
||||
}
|
||||
List<TaskResult> taskFailures = result.getResults().stream()
|
||||
.filter(taskResult -> !taskResult.isSuccess() && !taskResult.isDiscarded())
|
||||
.toList();
|
||||
if (result.getFailedCount() > taskFailures.size()) {
|
||||
throw new IllegalArgumentException("Reprocessing not allowed since there are too many failures (more than " + taskFailures.size() + ")");
|
||||
}
|
||||
|
||||
result.setFailedCount(0);
|
||||
result.setResults(result.getResults().stream()
|
||||
.filter(TaskResult::isSuccess)
|
||||
.toList());
|
||||
|
||||
job.getConfiguration().setToReprocess(taskFailures);
|
||||
|
||||
job.getConfiguration().setTasksKey(UUID.randomUUID().toString());
|
||||
jobService.saveJob(tenantId, job);
|
||||
}
|
||||
|
||||
private void submitTask(Task<?> task) {
|
||||
if (ObjectUtils.anyNull(task.getTenantId(), task.getJobId(), task.getKey())) {
|
||||
throw new IllegalArgumentException("Task " + task + " missing required fields");
|
||||
}
|
||||
|
||||
log.debug("[{}][{}] Submitting task: {}", task.getTenantId(), task.getJobId(), task);
|
||||
TaskProto taskProto = TaskProto.newBuilder()
|
||||
.setValue(JacksonUtil.toString(task))
|
||||
|
||||
@ -76,6 +76,7 @@ public class DummyJobProcessor implements JobProcessor {
|
||||
return DummyTask.builder()
|
||||
.tenantId(job.getTenantId())
|
||||
.jobId(job.getId())
|
||||
.key(configuration.getTasksKey())
|
||||
.retries(configuration.getRetries())
|
||||
.number(number)
|
||||
.processingTimeMs(configuration.getTaskProcessingTimeMs())
|
||||
|
||||
@ -36,7 +36,7 @@ public class DummyTaskProcessor extends TaskProcessor<DummyTask, DummyTaskResult
|
||||
String error = task.getErrors().get(task.getAttempt() - 1);
|
||||
throw new RuntimeException(error);
|
||||
}
|
||||
return DummyTaskResult.success();
|
||||
return DummyTaskResult.success(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -579,7 +579,8 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
}
|
||||
}
|
||||
|
||||
private void broadcast(ComponentLifecycleMsg msg) {
|
||||
@Override
|
||||
public void broadcast(ComponentLifecycleMsg msg) {
|
||||
ComponentLifecycleMsgProto componentLifecycleMsgProto = toProto(msg);
|
||||
TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer();
|
||||
Set<String> tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE);
|
||||
|
||||
@ -36,7 +36,7 @@ import org.thingsboard.server.common.data.job.task.DummyTaskResult.DummyTaskFail
|
||||
import org.thingsboard.server.common.data.notification.Notification;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.controller.AbstractControllerTest;
|
||||
import org.thingsboard.server.dao.job.JobService;
|
||||
import org.thingsboard.server.dao.job.JobDao;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.queue.task.JobStatsService;
|
||||
|
||||
@ -69,7 +69,7 @@ public class JobManagerTest extends AbstractControllerTest {
|
||||
private JobStatsService jobStatsService;
|
||||
|
||||
@Autowired
|
||||
private JobService jobService;
|
||||
private JobDao jobDao;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
@ -220,7 +220,7 @@ public class JobManagerTest extends AbstractControllerTest {
|
||||
inv.callRealMethod();
|
||||
}
|
||||
return null;
|
||||
}).when(taskProcessor).addToDiscardedJobs(any()); // ignoring cancellation event,
|
||||
}).when(taskProcessor).addToDiscarded(any()); // ignoring cancellation event,
|
||||
cancelJob(jobId);
|
||||
|
||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||
@ -256,7 +256,7 @@ public class JobManagerTest extends AbstractControllerTest {
|
||||
|
||||
Thread.sleep(3000);
|
||||
verify(jobStatsService, never()).reportTaskResult(any(), any(), any());
|
||||
assertThat(jobService.findJobsByFilter(tenantId, JobFilter.builder().build(), new PageLink(100)).getData()).isEmpty();
|
||||
assertThat(jobDao.findByTenantIdAndFilter(tenantId, JobFilter.builder().build(), new PageLink(100)).getData()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -340,7 +340,7 @@ public class JobManagerTest extends AbstractControllerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGeneralJobError() {
|
||||
public void testSubmitJob_generalError() {
|
||||
int submittedTasks = 100;
|
||||
JobId jobId = jobManager.submitJob(Job.builder()
|
||||
.tenantId(tenantId)
|
||||
@ -358,7 +358,7 @@ public class JobManagerTest extends AbstractControllerTest {
|
||||
Job job = findJobById(jobId);
|
||||
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED);
|
||||
assertThat(job.getResult().getSuccessfulCount()).isBetween(1, submittedTasks);
|
||||
assertThat(job.getResult().getDiscardedCount()).isBetween(1, submittedTasks);
|
||||
assertThat(job.getResult().getDiscardedCount()).isZero();
|
||||
assertThat(job.getResult().getTotalCount()).isNull();
|
||||
});
|
||||
|
||||
@ -369,7 +369,70 @@ public class JobManagerTest extends AbstractControllerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobReprocessing() throws Exception {
|
||||
public void testSubmitJob_immediateGeneralError() {
|
||||
JobId jobId = jobManager.submitJob(Job.builder()
|
||||
.tenantId(tenantId)
|
||||
.type(JobType.DUMMY)
|
||||
.key("test-job")
|
||||
.description("Test job")
|
||||
.configuration(DummyJobConfiguration.builder()
|
||||
.generalError("Some error while submitting tasks")
|
||||
.submittedTasksBeforeGeneralError(0)
|
||||
.build())
|
||||
.build()).getId();
|
||||
|
||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||
Job job = findJobById(jobId);
|
||||
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED);
|
||||
assertThat(job.getResult().getSuccessfulCount()).isZero();
|
||||
assertThat(job.getResult().getDiscardedCount()).isZero();
|
||||
assertThat(job.getResult().getFailedCount()).isZero();
|
||||
assertThat(job.getResult().getTotalCount()).isNull();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReprocessJob_generalError() throws Exception {
|
||||
int submittedTasks = 100;
|
||||
JobId jobId = jobManager.submitJob(Job.builder()
|
||||
.tenantId(tenantId)
|
||||
.type(JobType.DUMMY)
|
||||
.key("test-job")
|
||||
.description("Test job")
|
||||
.configuration(DummyJobConfiguration.builder()
|
||||
.generalError("Some error while submitting tasks")
|
||||
.submittedTasksBeforeGeneralError(submittedTasks)
|
||||
.taskProcessingTimeMs(10)
|
||||
.build())
|
||||
.build()).getId();
|
||||
|
||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||
Job job = findJobById(jobId);
|
||||
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED);
|
||||
assertThat(job.getResult().getGeneralError()).isEqualTo("Some error while submitting tasks");
|
||||
});
|
||||
|
||||
Job savedJob = jobDao.findById(tenantId, jobId.getId());
|
||||
DummyJobConfiguration configuration = savedJob.getConfiguration();
|
||||
configuration.setGeneralError(null);
|
||||
configuration.setSuccessfulTasksCount(submittedTasks);
|
||||
jobDao.save(tenantId, savedJob);
|
||||
|
||||
reprocessJob(jobId);
|
||||
|
||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||
Job job = findJobById(jobId);
|
||||
assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED);
|
||||
assertThat(job.getResult().getGeneralError()).isNull();
|
||||
assertThat(job.getResult().getSuccessfulCount()).isEqualTo(submittedTasks);
|
||||
assertThat(job.getResult().getTotalCount()).isEqualTo(submittedTasks);
|
||||
assertThat(job.getResult().getFailedCount()).isZero();
|
||||
assertThat(job.getResult().getDiscardedCount()).isZero();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReprocessJob() throws Exception {
|
||||
int successfulTasks = 3;
|
||||
int failedTasks = 2;
|
||||
int totalTasksCount = successfulTasks + failedTasks;
|
||||
@ -410,11 +473,12 @@ public class JobManagerTest extends AbstractControllerTest {
|
||||
assertThat(job.getResult().getFailedCount()).isZero();
|
||||
assertThat(job.getResult().getTotalCount()).isEqualTo(totalTasksCount);
|
||||
assertThat(job.getResult().getResults()).isEmpty();
|
||||
assertThat(job.getConfiguration().getToReprocess()).isNullOrEmpty();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobReprocessing_somePermanentlyFailed() throws Exception {
|
||||
public void testReprocessJob_somePermanentlyFailed() throws Exception {
|
||||
int successfulTasks = 3;
|
||||
int failedTasks = 2;
|
||||
int permanentlyFailedTasks = 1;
|
||||
|
||||
@ -36,7 +36,7 @@ public class JobManagerTest_EntityPartitioningStrategy extends JobManagerTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testGeneralJobError() {
|
||||
public void testSubmitJob_generalError() {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -87,6 +87,8 @@ public interface TbClusterService extends TbQueueClusterService {
|
||||
|
||||
void broadcastEntityStateChangeEvent(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
|
||||
|
||||
void broadcast(ComponentLifecycleMsg componentLifecycleMsg);
|
||||
|
||||
void onDeviceProfileChange(DeviceProfile deviceProfile, DeviceProfile oldDeviceProfile, TbQueueCallback callback);
|
||||
|
||||
void onDeviceProfileDelete(DeviceProfile deviceProfile, TbQueueCallback callback);
|
||||
|
||||
@ -40,4 +40,6 @@ public interface JobService extends EntityDaoService {
|
||||
|
||||
Job findLatestJobByKey(TenantId tenantId, String key);
|
||||
|
||||
void deleteJob(TenantId tenantId, JobId jobId);
|
||||
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ -28,6 +29,7 @@ import java.util.List;
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
@ToString(callSuper = true)
|
||||
public class DummyJobConfiguration extends JobConfiguration {
|
||||
|
||||
private long taskProcessingTimeMs;
|
||||
|
||||
@ -28,6 +28,8 @@ import org.thingsboard.server.common.data.HasTenantId;
|
||||
import org.thingsboard.server.common.data.id.JobId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@ToString(callSuper = true)
|
||||
@ -42,19 +44,26 @@ public class Job extends BaseData<JobId> implements HasTenantId {
|
||||
private String key;
|
||||
@NotBlank
|
||||
private String description;
|
||||
@NotNull
|
||||
private JobStatus status;
|
||||
@NotNull
|
||||
@Valid
|
||||
private JobConfiguration configuration;
|
||||
@NotNull
|
||||
private JobResult result;
|
||||
|
||||
@Builder
|
||||
@Builder(toBuilder = true)
|
||||
public Job(TenantId tenantId, JobType type, String key, String description, JobConfiguration configuration) {
|
||||
this.tenantId = tenantId;
|
||||
this.type = type;
|
||||
this.key = key;
|
||||
this.description = description;
|
||||
this.configuration = configuration;
|
||||
this.configuration.setTasksKey(UUID.randomUUID().toString());
|
||||
presetResult();
|
||||
}
|
||||
|
||||
public void presetResult() {
|
||||
this.result = switch (type) {
|
||||
case DUMMY -> new DummyJobResult();
|
||||
};
|
||||
|
||||
@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.data.job.task.TaskResult;
|
||||
|
||||
@ -34,7 +35,9 @@ import java.util.List;
|
||||
@Data
|
||||
public abstract class JobConfiguration implements Serializable {
|
||||
|
||||
private List<TaskResult> toReprocess;
|
||||
@NotBlank
|
||||
private String tasksKey; // internal
|
||||
private List<TaskResult> toReprocess; // internal
|
||||
|
||||
@JsonIgnore
|
||||
public abstract JobType getType();
|
||||
|
||||
@ -46,7 +46,7 @@ public class DummyTask extends Task<DummyTaskResult> {
|
||||
|
||||
@Override
|
||||
public DummyTaskResult toDiscarded() {
|
||||
return DummyTaskResult.discarded();
|
||||
return DummyTaskResult.discarded(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.job.task;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.thingsboard.server.common.data.job.JobType;
|
||||
|
||||
@ -25,29 +26,34 @@ import org.thingsboard.server.common.data.job.JobType;
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@NoArgsConstructor
|
||||
@SuperBuilder
|
||||
@ToString(callSuper = true)
|
||||
public class DummyTaskResult extends TaskResult {
|
||||
|
||||
private static final DummyTaskResult SUCCESS = DummyTaskResult.builder().success(true).build();
|
||||
private static final DummyTaskResult DISCARDED = DummyTaskResult.builder().discarded(true).build();
|
||||
|
||||
private DummyTaskFailure failure;
|
||||
|
||||
public static DummyTaskResult success() {
|
||||
return SUCCESS;
|
||||
public static DummyTaskResult success(DummyTask task) {
|
||||
return DummyTaskResult.builder()
|
||||
.key(task.getKey())
|
||||
.success(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static DummyTaskResult failed(DummyTask task, Throwable error) {
|
||||
DummyTaskResult result = new DummyTaskResult();
|
||||
result.setFailure(DummyTaskFailure.builder()
|
||||
.error(error.getMessage())
|
||||
.number(task.getNumber())
|
||||
.failAlways(task.isFailAlways())
|
||||
.build());
|
||||
return result;
|
||||
return DummyTaskResult.builder()
|
||||
.key(task.getKey())
|
||||
.failure(DummyTaskFailure.builder()
|
||||
.error(error.getMessage())
|
||||
.number(task.getNumber())
|
||||
.failAlways(task.isFailAlways())
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
|
||||
public static DummyTaskResult discarded() {
|
||||
return DISCARDED;
|
||||
public static DummyTaskResult discarded(DummyTask task) {
|
||||
return DummyTaskResult.builder()
|
||||
.key(task.getKey())
|
||||
.discarded(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -40,6 +40,7 @@ public abstract class Task<R extends TaskResult> {
|
||||
|
||||
private TenantId tenantId;
|
||||
private JobId jobId;
|
||||
private String key;
|
||||
private int retries;
|
||||
|
||||
public Task() {
|
||||
|
||||
@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.job.JobType;
|
||||
})
|
||||
public abstract class TaskResult {
|
||||
|
||||
private String key;
|
||||
private boolean success;
|
||||
private boolean discarded;
|
||||
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.common.msg.plugin;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
@ -45,13 +46,14 @@ public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg {
|
||||
private final String name;
|
||||
private final EntityId oldProfileId;
|
||||
private final EntityId profileId;
|
||||
private final JsonNode info;
|
||||
|
||||
public ComponentLifecycleMsg(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent event) {
|
||||
this(tenantId, entityId, event, null, null, null, null);
|
||||
this(tenantId, entityId, event, null, null, null, null, null);
|
||||
}
|
||||
|
||||
@Builder
|
||||
private ComponentLifecycleMsg(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent event, String oldName, String name, EntityId oldProfileId, EntityId profileId) {
|
||||
private ComponentLifecycleMsg(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent event, String oldName, String name, EntityId oldProfileId, EntityId profileId, JsonNode info) {
|
||||
this.tenantId = tenantId;
|
||||
this.entityId = entityId;
|
||||
this.event = event;
|
||||
@ -59,6 +61,7 @@ public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg {
|
||||
this.name = name;
|
||||
this.oldProfileId = oldProfileId;
|
||||
this.profileId = profileId;
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
public Optional<RuleChainId> getRuleChainId() {
|
||||
|
||||
@ -139,6 +139,9 @@ public class ProtoUtils {
|
||||
if (msg.getOldName() != null) {
|
||||
builder.setOldName(msg.getOldName());
|
||||
}
|
||||
if (msg.getInfo() != null) {
|
||||
builder.setInfo(JacksonUtil.toString(msg.getInfo()));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@ -166,6 +169,9 @@ public class ProtoUtils {
|
||||
var profileType = EntityType.DEVICE.equals(entityId.getEntityType()) ? EntityType.DEVICE_PROFILE : EntityType.ASSET_PROFILE;
|
||||
builder.oldProfileId(EntityIdFactory.getByTypeAndUuid(profileType, new UUID(proto.getOldProfileIdMSB(), proto.getOldProfileIdLSB())));
|
||||
}
|
||||
if (proto.hasInfo()) {
|
||||
builder.info(JacksonUtil.toJsonNode(proto.getInfo()));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
@ -1261,6 +1261,7 @@ message ComponentLifecycleMsgProto {
|
||||
int64 oldProfileIdLSB = 10;
|
||||
int64 profileIdMSB = 11;
|
||||
int64 profileIdLSB = 12;
|
||||
optional string info = 13;
|
||||
}
|
||||
|
||||
message EdgeEventMsgProto {
|
||||
|
||||
@ -68,7 +68,9 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
|
||||
private MainQueueConsumerManager<TbProtoQueueMsg<TaskProto>, QueueConfig> taskConsumer;
|
||||
private final ExecutorService taskExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getJobType().name().toLowerCase() + "-task-processor"));
|
||||
|
||||
private final SetCache<UUID> discardedJobs = new SetCache<>(TimeUnit.MINUTES.toMillis(60));
|
||||
private final SetCache<String> discarded = new SetCache<>(TimeUnit.MINUTES.toMillis(60));
|
||||
private final SetCache<String> failed = new SetCache<>(TimeUnit.MINUTES.toMillis(60));
|
||||
|
||||
private final SetCache<UUID> deletedTenants = new SetCache<>(TimeUnit.MINUTES.toMillis(60));
|
||||
|
||||
@PostConstruct
|
||||
@ -98,9 +100,13 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
|
||||
EntityId entityId = event.getEntityId();
|
||||
switch (entityId.getEntityType()) {
|
||||
case JOB -> {
|
||||
String tasksKey = event.getInfo().get("tasksKey").asText();
|
||||
if (event.getEvent() == ComponentLifecycleEvent.STOPPED) {
|
||||
log.info("Adding job {} to discarded", entityId);
|
||||
addToDiscardedJobs(entityId.getId());
|
||||
log.info("Adding job {} ({}) to discarded", entityId, tasksKey);
|
||||
addToDiscarded(tasksKey);
|
||||
} else if (event.getEvent() == ComponentLifecycleEvent.FAILED) {
|
||||
log.info("Adding job {} ({}) to failed", entityId, tasksKey);
|
||||
failed.add(tasksKey);
|
||||
}
|
||||
}
|
||||
case TENANT -> {
|
||||
@ -117,14 +123,18 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
T task = (T) JacksonUtil.fromString(msg.getValue().getValue(), Task.class);
|
||||
if (discardedJobs.contains(task.getJobId().getId())) {
|
||||
log.debug("Skipping task for cancelled job {}: {}", task.getJobId(), task);
|
||||
if (discarded.contains(task.getKey())) {
|
||||
log.debug("Skipping task for discarded job {}: {}", task.getJobId(), task);
|
||||
reportTaskDiscarded(task);
|
||||
continue;
|
||||
} else if (failed.contains(task.getKey())) {
|
||||
log.debug("Skipping task for failed job {}: {}", task.getJobId(), task);
|
||||
continue;
|
||||
} else if (deletedTenants.contains(task.getTenantId().getId())) {
|
||||
log.debug("Skipping task for deleted tenant {}: {}", task.getTenantId(), task);
|
||||
continue;
|
||||
}
|
||||
|
||||
processTask(task);
|
||||
} catch (InterruptedException e) {
|
||||
throw e;
|
||||
@ -185,8 +195,8 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
|
||||
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
|
||||
}
|
||||
|
||||
public void addToDiscardedJobs(UUID jobId) {
|
||||
discardedJobs.add(jobId);
|
||||
public void addToDiscarded(String tasksKey) {
|
||||
discarded.add(tasksKey);
|
||||
}
|
||||
|
||||
protected <V> V wait(Future<V> future) throws Exception {
|
||||
|
||||
@ -37,9 +37,10 @@ import com.google.common.collect.Lists;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jetbrains.annotations.Contract;
|
||||
import org.thingsboard.server.common.data.Views;
|
||||
import org.thingsboard.server.common.data.kv.DataType;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
import org.thingsboard.server.common.data.Views;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
@ -109,6 +110,7 @@ public class JacksonUtil {
|
||||
}
|
||||
}
|
||||
|
||||
@Contract("null, _ -> null") // so that IDE doesn't show NPE warning when input is not null
|
||||
public static <T> T fromString(String string, Class<T> clazz) {
|
||||
try {
|
||||
return string != null ? OBJECT_MAPPER.readValue(string, clazz) : null;
|
||||
|
||||
@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.dao.entity.AbstractEntityService;
|
||||
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
|
||||
import org.thingsboard.server.dao.service.ConstraintValidator;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@ -119,6 +120,11 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
|
||||
|
||||
boolean publishEvent = false;
|
||||
for (TaskResult taskResult : jobStats.getTaskResults()) {
|
||||
if (!taskResult.getKey().equals(job.getConfiguration().getTasksKey())) {
|
||||
log.debug("Ignoring task result {} with outdated key {}", taskResult, job.getConfiguration().getTasksKey());
|
||||
continue;
|
||||
}
|
||||
|
||||
result.processTaskResult(taskResult);
|
||||
|
||||
if (result.getCancellationTs() > 0) {
|
||||
@ -142,6 +148,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
|
||||
publishEvent = true;
|
||||
}
|
||||
result.setFinishTs(System.currentTimeMillis());
|
||||
job.getConfiguration().setToReprocess(null);
|
||||
}
|
||||
}
|
||||
|
||||
@ -149,6 +156,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
|
||||
}
|
||||
|
||||
private Job saveJob(TenantId tenantId, Job job, boolean publishEvent, JobStatus prevStatus) {
|
||||
ConstraintValidator.validateFields(job);
|
||||
job = jobDao.save(tenantId, job);
|
||||
if (publishEvent) {
|
||||
eventPublisher.publishEvent(SaveEntityEvent.builder()
|
||||
@ -186,6 +194,15 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
|
||||
return jobDao.findLatestByTenantIdAndKey(tenantId, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteJob(TenantId tenantId, JobId jobId) {
|
||||
Job job = findJobById(tenantId, jobId);
|
||||
if (!job.getStatus().isOneOf(CANCELLED, COMPLETED, FAILED)) {
|
||||
throw new IllegalArgumentException("Job must be cancelled, completed or failed");
|
||||
}
|
||||
jobDao.removeById(tenantId, jobId.getId());
|
||||
}
|
||||
|
||||
private Job findForUpdate(TenantId tenantId, JobId jobId) {
|
||||
return jobDao.findByIdForUpdate(tenantId, jobId);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user