Housekeeper: interrupt task processing on timeout
This commit is contained in:
		
							parent
							
								
									cd0e8a0211
								
							
						
					
					
						commit
						c42cf2817e
					
				@ -117,9 +117,10 @@ public class HousekeeperService {
 | 
			
		||||
            throw new IllegalArgumentException("Unsupported task type " + taskType);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Future<Object> future = null;
 | 
			
		||||
        try {
 | 
			
		||||
            long startTs = System.currentTimeMillis();
 | 
			
		||||
            Future<Object> future = taskExecutor.submit(() -> {
 | 
			
		||||
            future = taskExecutor.submit(() -> {
 | 
			
		||||
                taskProcessor.process((T) task);
 | 
			
		||||
                return null;
 | 
			
		||||
            });
 | 
			
		||||
@ -137,7 +138,8 @@ public class HousekeeperService {
 | 
			
		||||
            if (e instanceof ExecutionException) {
 | 
			
		||||
                error = e.getCause();
 | 
			
		||||
            } else if (e instanceof TimeoutException) {
 | 
			
		||||
                error = new TimeoutException("Timeout after " + config.getTaskProcessingTimeout() + " seconds");
 | 
			
		||||
                future.cancel(true); // interrupting the task
 | 
			
		||||
                error = new TimeoutException("Timeout after " + config.getTaskProcessingTimeout() + " ms");
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if (msg.getTask().getAttempt() < config.getMaxReprocessingAttempts()) {
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,8 @@ import org.thingsboard.server.common.data.housekeeper.HousekeeperTask;
 | 
			
		||||
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
 | 
			
		||||
import org.thingsboard.server.common.msg.housekeeper.HousekeeperClient;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.Future;
 | 
			
		||||
 | 
			
		||||
public abstract class HousekeeperTaskProcessor<T extends HousekeeperTask> {
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
@ -29,4 +31,13 @@ public abstract class HousekeeperTaskProcessor<T extends HousekeeperTask> {
 | 
			
		||||
 | 
			
		||||
    public abstract HousekeeperTaskType getTaskType();
 | 
			
		||||
 | 
			
		||||
    public <V> V wait(Future<V> future) throws Exception {
 | 
			
		||||
        try {
 | 
			
		||||
            return future.get(); // will be interrupted after taskProcessingTimeout
 | 
			
		||||
        } catch (InterruptedException e) {
 | 
			
		||||
            future.cancel(true); // interrupting the underlying task
 | 
			
		||||
            throw e;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -33,7 +33,7 @@ public class LatestTsDeletionTaskProcessor extends HousekeeperTaskProcessor<Late
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void process(LatestTsDeletionHousekeeperTask task) throws Exception {
 | 
			
		||||
        timeseriesService.removeLatest(task.getTenantId(), task.getEntityId(), List.of(task.getKey())).get();
 | 
			
		||||
        wait(timeseriesService.removeLatest(task.getTenantId(), task.getEntityId(), List.of(task.getKey())));
 | 
			
		||||
        log.debug("[{}][{}][{}] Deleted latest telemetry for key '{}'", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getKey());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -36,7 +36,7 @@ public class TsHistoryDeletionTaskProcessor extends HousekeeperTaskProcessor<TsH
 | 
			
		||||
    @Override
 | 
			
		||||
    public void process(TsHistoryDeletionHousekeeperTask task) throws Exception {
 | 
			
		||||
        DeleteTsKvQuery deleteQuery = new BaseDeleteTsKvQuery(task.getKey(), 0, System.currentTimeMillis(), false, false);
 | 
			
		||||
        timeseriesService.remove(task.getTenantId(), task.getEntityId(), List.of(deleteQuery)).get();
 | 
			
		||||
        wait(timeseriesService.remove(task.getTenantId(), task.getEntityId(), List.of(deleteQuery)));
 | 
			
		||||
        log.debug("[{}][{}][{}] Deleted timeseries history for key '{}'", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getKey());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -83,8 +83,12 @@ import java.util.Arrays;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.Future;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.TimeoutException;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicBoolean;
 | 
			
		||||
import java.util.function.Function;
 | 
			
		||||
import java.util.function.Predicate;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
@ -93,6 +97,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.awaitility.Awaitility.await;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.argThat;
 | 
			
		||||
import static org.mockito.Mockito.doAnswer;
 | 
			
		||||
import static org.mockito.Mockito.doCallRealMethod;
 | 
			
		||||
import static org.mockito.Mockito.doThrow;
 | 
			
		||||
import static org.mockito.Mockito.never;
 | 
			
		||||
@ -103,7 +108,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 | 
			
		||||
@TestPropertySource(properties = {
 | 
			
		||||
        "queue.core.housekeeper.task-reprocessing-delay-ms=2000",
 | 
			
		||||
        "queue.core.housekeeper.poll-interval-ms=1000",
 | 
			
		||||
        "queue.core.housekeeper.max-reprocessing-attempts=5"
 | 
			
		||||
        "queue.core.housekeeper.max-reprocessing-attempts=5",
 | 
			
		||||
        "queue.core.housekeeper.task-processing-timeout-ms=5000",
 | 
			
		||||
})
 | 
			
		||||
public class HousekeeperServiceTest extends AbstractControllerTest {
 | 
			
		||||
 | 
			
		||||
@ -285,7 +291,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void whenTaskProcessingFails_thenReprocess() throws Exception {
 | 
			
		||||
        TimeoutException error = new TimeoutException("Test timeout");
 | 
			
		||||
        Exception error = new RuntimeException("Just a test");
 | 
			
		||||
        doThrow(error).when(tsHistoryDeletionTaskProcessor).process(any());
 | 
			
		||||
 | 
			
		||||
        Device device = createDevice("test", "test");
 | 
			
		||||
@ -309,6 +315,54 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void whenTaskProcessingTimedOut_thenInterruptAndReprocess() throws Exception {
 | 
			
		||||
        ExecutorService someExecutor = Executors.newSingleThreadExecutor();
 | 
			
		||||
        AtomicBoolean taskInterrupted = new AtomicBoolean(false);
 | 
			
		||||
        AtomicBoolean underlyingTaskInterrupted = new AtomicBoolean(false);
 | 
			
		||||
        doAnswer(invocationOnMock -> {
 | 
			
		||||
            Future<?> future = someExecutor.submit(() -> {
 | 
			
		||||
                try {
 | 
			
		||||
                    Thread.sleep(TimeUnit.HOURS.toMillis(24));
 | 
			
		||||
                } catch (InterruptedException e) {
 | 
			
		||||
                    underlyingTaskInterrupted.set(true);
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
            try {
 | 
			
		||||
                future.get();
 | 
			
		||||
            } catch (InterruptedException e) {
 | 
			
		||||
                taskInterrupted.set(true);
 | 
			
		||||
                future.cancel(true);
 | 
			
		||||
                throw e;
 | 
			
		||||
            }
 | 
			
		||||
            return null;
 | 
			
		||||
        }).when(tsHistoryDeletionTaskProcessor).process(any());
 | 
			
		||||
 | 
			
		||||
        Device device = createDevice("test", "test");
 | 
			
		||||
        createRelatedData(device.getId());
 | 
			
		||||
 | 
			
		||||
        doDelete("/api/device/" + device.getId()).andExpect(status().isOk());
 | 
			
		||||
 | 
			
		||||
        int attempts = 2;
 | 
			
		||||
        await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
            for (int i = 0; i <= attempts; i++) {
 | 
			
		||||
                int attempt = i;
 | 
			
		||||
                verify(housekeeperReprocessingService).submitForReprocessing(argThat(getTaskMatcher(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY,
 | 
			
		||||
                        task -> task.getAttempt() == attempt)), argThat(error -> error instanceof TimeoutException));
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        assertThat(taskInterrupted).isTrue();
 | 
			
		||||
        assertThat(underlyingTaskInterrupted).isTrue();
 | 
			
		||||
 | 
			
		||||
        assertThat(getTimeseriesHistory(device.getId())).isNotEmpty();
 | 
			
		||||
        doCallRealMethod().when(tsHistoryDeletionTaskProcessor).process(any());
 | 
			
		||||
        someExecutor.shutdown();
 | 
			
		||||
 | 
			
		||||
        await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
            assertThat(getTimeseriesHistory(device.getId())).isEmpty();
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void whenReprocessingAttemptsExceeded_thenDropTheTask() throws Exception {
 | 
			
		||||
        TimeoutException error = new TimeoutException("Test timeout");
 | 
			
		||||
 | 
			
		||||
@ -115,7 +115,7 @@ public class SemaphoreWithTbMsgQueue {
 | 
			
		||||
                }
 | 
			
		||||
                TbMsg msg = tbMsgTbContext.msg();
 | 
			
		||||
                TbContext ctx = tbMsgTbContext.ctx();
 | 
			
		||||
                log.warn("[{}] Failed to process message: {}", entityId, msg, t);
 | 
			
		||||
                log.debug("[{}] Failed to process message: {}", entityId, msg, t);
 | 
			
		||||
                ctx.tellFailure(msg, t); // you are not allowed to throw here, because queue will remain unprocessed
 | 
			
		||||
                continue; // We are probably the last who process the queue. We have to continue poll until get successful callback or queue is empty
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user