diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java index 933f6fad3e..cfa5eeac7e 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java @@ -117,9 +117,10 @@ public class HousekeeperService { throw new IllegalArgumentException("Unsupported task type " + taskType); } + Future future = null; try { long startTs = System.currentTimeMillis(); - Future future = taskExecutor.submit(() -> { + future = taskExecutor.submit(() -> { taskProcessor.process((T) task); return null; }); @@ -137,7 +138,7 @@ public class HousekeeperService { if (e instanceof ExecutionException) { error = e.getCause(); } else if (e instanceof TimeoutException) { - error = new TimeoutException("Timeout after " + config.getTaskProcessingTimeout() + " seconds"); + error = new TimeoutException("Timeout after " + config.getTaskProcessingTimeout() + " ms"); } if (msg.getTask().getAttempt() < config.getMaxReprocessingAttempts()) { @@ -153,6 +154,10 @@ public class HousekeeperService { .build()); } statsService.ifPresent(statsService -> statsService.reportFailure(taskType, msg)); + } finally { + if (future != null && !future.isDone()) { + future.cancel(true); + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/HousekeeperTaskProcessor.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/HousekeeperTaskProcessor.java index f193b154db..5ef8b2f7f4 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/HousekeeperTaskProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/HousekeeperTaskProcessor.java @@ -20,6 +20,8 @@ import org.thingsboard.server.common.data.housekeeper.HousekeeperTask; import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType; import org.thingsboard.server.common.msg.housekeeper.HousekeeperClient; +import java.util.concurrent.Future; + public abstract class HousekeeperTaskProcessor { @Autowired @@ -29,4 +31,13 @@ public abstract class HousekeeperTaskProcessor { public abstract HousekeeperTaskType getTaskType(); + public V wait(Future future) throws Exception { + try { + return future.get(); // will be interrupted after taskProcessingTimeout + } catch (InterruptedException e) { + future.cancel(true); // interrupting the underlying task + throw e; + } + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/LatestTsDeletionTaskProcessor.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/LatestTsDeletionTaskProcessor.java index 1f4b6b57ae..64d0043c5a 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/LatestTsDeletionTaskProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/LatestTsDeletionTaskProcessor.java @@ -33,7 +33,7 @@ public class LatestTsDeletionTaskProcessor extends HousekeeperTaskProcessor { + Future future = someExecutor.submit(() -> { + try { + Thread.sleep(TimeUnit.HOURS.toMillis(24)); + } catch (InterruptedException e) { + underlyingTaskInterrupted.set(true); + } + }); + try { + future.get(); + } catch (InterruptedException e) { + taskInterrupted.set(true); + future.cancel(true); + throw e; + } + return null; + }).when(tsHistoryDeletionTaskProcessor).process(any()); + + Device device = createDevice("test", "test"); + createRelatedData(device.getId()); + + doDelete("/api/device/" + device.getId()).andExpect(status().isOk()); + + int attempts = 2; + await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> { + for (int i = 0; i <= attempts; i++) { + int attempt = i; + verify(housekeeperReprocessingService).submitForReprocessing(argThat(getTaskMatcher(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY, + task -> task.getAttempt() == attempt)), argThat(error -> error instanceof TimeoutException)); + } + }); + assertThat(taskInterrupted).isTrue(); + assertThat(underlyingTaskInterrupted).isTrue(); + + assertThat(getTimeseriesHistory(device.getId())).isNotEmpty(); + doCallRealMethod().when(tsHistoryDeletionTaskProcessor).process(any()); + someExecutor.shutdown(); + + await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(getTimeseriesHistory(device.getId())).isEmpty(); + }); + } + @Test public void whenReprocessingAttemptsExceeded_thenDropTheTask() throws Exception { TimeoutException error = new TimeoutException("Test timeout"); diff --git a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardScheduledThreadPoolExecutor.java b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardScheduledThreadPoolExecutor.java index fd3ecb085c..831982e102 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardScheduledThreadPoolExecutor.java +++ b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardScheduledThreadPoolExecutor.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2024 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.ts b/msa/js-executor/api/jsInvokeMessageProcessor.ts index 98b9c0fe0c..fe18c320bc 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.ts +++ b/msa/js-executor/api/jsInvokeMessageProcessor.ts @@ -130,7 +130,7 @@ export class JsInvokeMessageProcessor { processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) { const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest); - this.logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId); + this.logger.debug('[%s] Processing compile request, scriptId: [%s], compileRequest [%s]', requestId, scriptId, compileRequest); if (this.scriptMap.has(scriptId)) { const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, true); this.logger.debug('[%s] Script was already compiled, scriptId: [%s]', requestId, scriptId); @@ -154,7 +154,7 @@ export class JsInvokeMessageProcessor { processInvokeRequest(requestId: string, responseTopic: string, headers: any, invokeRequest: JsInvokeRequest) { const scriptId = JsInvokeMessageProcessor.getScriptId(invokeRequest); - this.logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId); + this.logger.debug('[%s] Processing invoke request, scriptId: [%s], invokeRequest [%s]', requestId, scriptId, invokeRequest); this.executedScriptsCounter++; if (this.executedScriptsCounter % statFrequency == 0) { const nowMs = performance.now(); @@ -217,7 +217,7 @@ export class JsInvokeMessageProcessor { processReleaseRequest(requestId: string, responseTopic: string, headers: any, releaseRequest: JsReleaseRequest) { const scriptId = JsInvokeMessageProcessor.getScriptId(releaseRequest); - this.logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId); + this.logger.debug('[%s] Processing release request, scriptId: [%s], releaseRequest [%s]', requestId, scriptId, releaseRequest); if (this.scriptMap.has(scriptId)) { const index = this.scriptIds.indexOf(scriptId); if (index > -1) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/SemaphoreWithTbMsgQueue.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/SemaphoreWithTbMsgQueue.java index fa00856b4b..50a109163c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/SemaphoreWithTbMsgQueue.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/SemaphoreWithTbMsgQueue.java @@ -115,7 +115,7 @@ public class SemaphoreWithTbMsgQueue { } TbMsg msg = tbMsgTbContext.msg(); TbContext ctx = tbMsgTbContext.ctx(); - log.warn("[{}] Failed to process message: {}", entityId, msg, t); + log.debug("[{}] Failed to process message: {}", entityId, msg, t); ctx.tellFailure(msg, t); // you are not allowed to throw here, because queue will remain unprocessed continue; // We are probably the last who process the queue. We have to continue poll until get successful callback or queue is empty }