From c42cf2817e3b3728f5dca48933a41ffd9787518f Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 2 Dec 2024 17:51:19 +0200 Subject: [PATCH] Housekeeper: interrupt task processing on timeout --- .../housekeeper/HousekeeperService.java | 6 +- .../processor/HousekeeperTaskProcessor.java | 11 ++++ .../LatestTsDeletionTaskProcessor.java | 2 +- .../TsHistoryDeletionTaskProcessor.java | 2 +- .../housekeeper/HousekeeperServiceTest.java | 58 ++++++++++++++++++- .../engine/util/SemaphoreWithTbMsgQueue.java | 2 +- 6 files changed, 74 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java index 933f6fad3e..c69bc62692 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java @@ -117,9 +117,10 @@ public class HousekeeperService { throw new IllegalArgumentException("Unsupported task type " + taskType); } + Future future = null; try { long startTs = System.currentTimeMillis(); - Future future = taskExecutor.submit(() -> { + future = taskExecutor.submit(() -> { taskProcessor.process((T) task); return null; }); @@ -137,7 +138,8 @@ public class HousekeeperService { if (e instanceof ExecutionException) { error = e.getCause(); } else if (e instanceof TimeoutException) { - error = new TimeoutException("Timeout after " + config.getTaskProcessingTimeout() + " seconds"); + future.cancel(true); // interrupting the task + error = new TimeoutException("Timeout after " + config.getTaskProcessingTimeout() + " ms"); } if (msg.getTask().getAttempt() < config.getMaxReprocessingAttempts()) { diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/HousekeeperTaskProcessor.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/HousekeeperTaskProcessor.java index f193b154db..5ef8b2f7f4 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/HousekeeperTaskProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/HousekeeperTaskProcessor.java @@ -20,6 +20,8 @@ import org.thingsboard.server.common.data.housekeeper.HousekeeperTask; import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType; import org.thingsboard.server.common.msg.housekeeper.HousekeeperClient; +import java.util.concurrent.Future; + public abstract class HousekeeperTaskProcessor { @Autowired @@ -29,4 +31,13 @@ public abstract class HousekeeperTaskProcessor { public abstract HousekeeperTaskType getTaskType(); + public V wait(Future future) throws Exception { + try { + return future.get(); // will be interrupted after taskProcessingTimeout + } catch (InterruptedException e) { + future.cancel(true); // interrupting the underlying task + throw e; + } + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/LatestTsDeletionTaskProcessor.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/LatestTsDeletionTaskProcessor.java index 1f4b6b57ae..64d0043c5a 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/LatestTsDeletionTaskProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/LatestTsDeletionTaskProcessor.java @@ -33,7 +33,7 @@ public class LatestTsDeletionTaskProcessor extends HousekeeperTaskProcessor { + Future future = someExecutor.submit(() -> { + try { + Thread.sleep(TimeUnit.HOURS.toMillis(24)); + } catch (InterruptedException e) { + underlyingTaskInterrupted.set(true); + } + }); + try { + future.get(); + } catch (InterruptedException e) { + taskInterrupted.set(true); + future.cancel(true); + throw e; + } + return null; + }).when(tsHistoryDeletionTaskProcessor).process(any()); + + Device device = createDevice("test", "test"); + createRelatedData(device.getId()); + + doDelete("/api/device/" + device.getId()).andExpect(status().isOk()); + + int attempts = 2; + await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> { + for (int i = 0; i <= attempts; i++) { + int attempt = i; + verify(housekeeperReprocessingService).submitForReprocessing(argThat(getTaskMatcher(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY, + task -> task.getAttempt() == attempt)), argThat(error -> error instanceof TimeoutException)); + } + }); + assertThat(taskInterrupted).isTrue(); + assertThat(underlyingTaskInterrupted).isTrue(); + + assertThat(getTimeseriesHistory(device.getId())).isNotEmpty(); + doCallRealMethod().when(tsHistoryDeletionTaskProcessor).process(any()); + someExecutor.shutdown(); + + await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(getTimeseriesHistory(device.getId())).isEmpty(); + }); + } + @Test public void whenReprocessingAttemptsExceeded_thenDropTheTask() throws Exception { TimeoutException error = new TimeoutException("Test timeout"); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/SemaphoreWithTbMsgQueue.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/SemaphoreWithTbMsgQueue.java index fa00856b4b..50a109163c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/SemaphoreWithTbMsgQueue.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/SemaphoreWithTbMsgQueue.java @@ -115,7 +115,7 @@ public class SemaphoreWithTbMsgQueue { } TbMsg msg = tbMsgTbContext.msg(); TbContext ctx = tbMsgTbContext.ctx(); - log.warn("[{}] Failed to process message: {}", entityId, msg, t); + log.debug("[{}] Failed to process message: {}", entityId, msg, t); ctx.tellFailure(msg, t); // you are not allowed to throw here, because queue will remain unprocessed continue; // We are probably the last who process the queue. We have to continue poll until get successful callback or queue is empty }