Discard tasks when tenant is deleted

This commit is contained in:
ViacheslavKlimov 2025-04-25 17:40:49 +03:00
parent ee9237c416
commit a317b4707a
6 changed files with 92 additions and 27 deletions

View File

@ -33,7 +33,7 @@ public class CfReprocessingTaskProcessor extends TaskProcessor<CfReprocessingTas
private final CalculatedFieldReprocessingService cfReprocessingService;
@Override
protected void process(CfReprocessingTask task) throws Exception {
public void process(CfReprocessingTask task) throws Exception {
SettableFuture<Void> future = SettableFuture.create();
cfReprocessingService.reprocess(task, new TbCallback() {
@Override

View File

@ -26,7 +26,7 @@ import org.thingsboard.server.queue.task.TaskProcessor;
public class DummyTaskProcessor extends TaskProcessor<DummyTask> {
@Override
protected void process(DummyTask task) throws Exception {
public void process(DummyTask task) throws Exception {
if (task.getProcessingTimeMs() > 0) {
Thread.sleep(task.getProcessingTimeMs());
}

View File

@ -16,13 +16,16 @@
package org.thingsboard.server.service.job;
import com.fasterxml.jackson.core.type.TypeReference;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.DummyJobConfiguration;
import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobResult;
@ -32,6 +35,8 @@ import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.dao.task.JobService;
import org.thingsboard.server.queue.task.JobStatsService;
import org.thingsboard.server.service.job.task.DummyTaskProcessor;
import java.util.List;
@ -42,6 +47,8 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@DaoSqlTest
@TestPropertySource(properties = {
@ -52,9 +59,15 @@ public class JobManagerTest extends AbstractControllerTest {
@Autowired
private JobManager jobManager;
@Autowired
private JobService jobService;
@SpyBean
private DummyTaskProcessor taskProcessor;
@SpyBean
private JobStatsService jobStatsService;
@Before
public void setUp() throws Exception {
loginTenantAdmin();
@ -185,6 +198,32 @@ public class JobManagerTest extends AbstractControllerTest {
});
}
@Test
public void whenTenantIsDeleted_thenCancelAllTheJobs() throws Exception {
loginSysAdmin();
createDifferentTenant();
TenantId tenantId = this.differentTenantId;
jobManager.submitJob(Job.builder()
.tenantId(tenantId)
.type(JobType.DUMMY)
.key("test-job")
.description("test job")
.configuration(DummyJobConfiguration.builder()
.successfulTasksCount(1000)
.taskProcessingTimeMs(500)
.build())
.build());
Thread.sleep(2000);
deleteDifferentTenant();
Mockito.reset(jobStatsService);
Thread.sleep(3000);
verify(jobStatsService, never()).reportTaskResult(any(), any(), any());
Assertions.assertThat(jobService.findJobsByTenantId(tenantId, new PageLink(100, 0)).getData()).isEmpty();
}
private Job findJobById(JobId jobId) throws Exception {
return doGet("/api/job/" + jobId, Job.class);
}

View File

@ -23,8 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.Task;
import org.thingsboard.server.common.data.job.TaskResult;
@ -57,6 +56,7 @@ public abstract class TaskProcessor<T extends Task> {
private QueueConsumerManager<TbProtoQueueMsg<TaskProto>> taskConsumer;
private ExecutorService consumerExecutor;
private final Set<UUID> deletedTenants = ConcurrentHashMap.newKeySet();
private final Set<UUID> cancelledJobs = ConcurrentHashMap.newKeySet(); // fixme use caffeine
@PostConstruct
@ -78,37 +78,54 @@ public abstract class TaskProcessor<T extends Task> {
}
@EventListener
public void onJobCancelled(ComponentLifecycleMsg event) {
if (event.getEntityId().getEntityType() != EntityType.JOB) {
return;
}
JobId jobId = (JobId) event.getEntityId();
public void onComponentLifecycle(ComponentLifecycleMsg event) {
EntityId entityId = event.getEntityId();
switch (entityId.getEntityType()) {
case JOB -> {
if (event.getEvent() == ComponentLifecycleEvent.STOPPED) {
log.info("Adding job {} to cancelled", jobId);
addToCancelledJobs(jobId);
log.info("Adding job {} to cancelledJobs", entityId);
addToCancelledJobs(entityId.getId());
}
}
case TENANT -> {
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
deletedTenants.add(entityId.getId());
log.info("Adding tenant {} to deletedTenants", entityId);
}
}
}
}
private void processMsgs(List<TbProtoQueueMsg<TaskProto>> msgs, TbQueueConsumer<TbProtoQueueMsg<TaskProto>> consumer) {
private void processMsgs(List<TbProtoQueueMsg<TaskProto>> msgs, TbQueueConsumer<TbProtoQueueMsg<TaskProto>> consumer) throws Exception {
for (TbProtoQueueMsg<TaskProto> msg : msgs) {
TaskProto taskProto = msg.getValue();
Task task = JacksonUtil.fromString(taskProto.getValue(), Task.class);
try {
Task task = JacksonUtil.fromString(msg.getValue().getValue(), Task.class);
if (cancelledJobs.contains(task.getJobId().getId())) {
log.info("Skipping task '{}' for cancelled job {}", task.getKey(), task.getJobId());
reportCancelled(task);
continue;
} else if (deletedTenants.contains(task.getTenantId().getId())) {
log.info("Skipping task '{}' for deleted tenant {}", task.getKey(), task.getTenantId());
continue;
}
processTask((T) task);
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
log.error("Failed to process msg: {}", msg, e);
}
}
consumer.commit();
}
private void processTask(T task) {
private void processTask(T task) throws Exception { // todo: timeout and task interruption
task.setAttempt(task.getAttempt() + 1);
log.info("Processing task: {}", task);
try {
process(task);
reportSuccess(task);
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
log.error("Failed to process task (attempt {}): {}", task.getAttempt(), task, e);
if (task.getAttempt() <= task.getRetries()) {
@ -119,7 +136,7 @@ public abstract class TaskProcessor<T extends Task> {
}
}
protected abstract void process(T task) throws Exception;
public abstract void process(T task) throws Exception;
private void reportSuccess(Task task) {
TaskResult result = TaskResult.builder()
@ -145,8 +162,8 @@ public abstract class TaskProcessor<T extends Task> {
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
}
public void addToCancelledJobs(JobId jobId) {
cancelledJobs.add(jobId.getId());
public void addToCancelledJobs(UUID jobId) {
cancelledJobs.add(jobId);
}
@PreDestroy

View File

@ -18,10 +18,10 @@ package org.thingsboard.server.dao.task;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.HasId;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.Job;
@ -73,6 +73,10 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
@Override
public void processStats(TenantId tenantId, JobId jobId, JobStats jobStats) {
Job job = findForUpdate(tenantId, jobId);
if (job == null) {
log.info("Got stale stats for job {}: {}", jobId, jobStats);
return;
}
switch (job.getStatus()) {
case PENDING -> {
job.setStatus(JobStatus.RUNNING);
@ -165,7 +169,12 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
@Override
public Optional<HasId<?>> findEntity(TenantId tenantId, EntityId entityId) {
return Optional.ofNullable(findJobById(tenantId, new JobId(entityId.getId())));
return Optional.ofNullable(findJobById(tenantId, (JobId) entityId));
}
@Override
public void deleteEntity(TenantId tenantId, EntityId id, boolean force) {
jobDao.removeById(tenantId, id.getId());
}
@Override

View File

@ -172,7 +172,7 @@ public class TenantServiceImpl extends AbstractCachedEntityService<TenantId, Ten
eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entityId(tenantId).entity(tenant).build());
cleanUpService.removeTenantEntities(tenantId, // don't forget to implement deleteEntity from EntityDaoService when adding entity type to this list
EntityType.ENTITY_VIEW, EntityType.WIDGETS_BUNDLE, EntityType.WIDGET_TYPE,
EntityType.JOB, EntityType.ENTITY_VIEW, EntityType.WIDGETS_BUNDLE, EntityType.WIDGET_TYPE,
EntityType.ASSET, EntityType.ASSET_PROFILE, EntityType.DEVICE, EntityType.DEVICE_PROFILE,
EntityType.DASHBOARD, EntityType.EDGE, EntityType.RULE_CHAIN, EntityType.API_USAGE_STATE,
EntityType.TB_RESOURCE, EntityType.OTA_PACKAGE, EntityType.RPC, EntityType.QUEUE,