From ac9e738018f272d165a9971213df0251114762b8 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Fri, 2 May 2025 17:43:14 +0300 Subject: [PATCH] Task processing timeout --- .../housekeeper/HousekeeperService.java | 1 + .../service/job/task/DummyTaskProcessor.java | 5 ++ .../server/service/job/JobManagerTest.java | 22 +++++++++ .../server/queue/task/TaskProcessor.java | 35 +++++++++++--- common/util/pom.xml | 4 ++ .../org/thingsboard/common/util/SetCache.java | 47 +++++++++++++++++++ 6 files changed, 108 insertions(+), 6 deletions(-) create mode 100644 common/util/src/main/java/org/thingsboard/common/util/SetCache.java diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java index 727e27c971..f2d3fad357 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java @@ -165,6 +165,7 @@ public class HousekeeperService { private void stop() { consumer.stop(); consumerExecutor.shutdownNow(); + taskExecutor.shutdownNow(); log.info("Stopped Housekeeper service"); } diff --git a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java index 564142ff2b..1bcf6b36b3 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java @@ -39,6 +39,11 @@ public class DummyTaskProcessor extends TaskProcessor { + Job job = findJobById(jobId); + assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); + JobResult jobResult = job.getResult(); + assertThat(jobResult.getFailedCount()).isEqualTo(1); + assertThat(((DummyTaskResult) jobResult.getResults().get(0)).getFailure().getError()).isEqualTo("Timeout after 2000 ms"); // last error + }); + } + @Test public void testCancelJob_whileRunning() throws Exception { int tasksCount = 100; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index 319ddf4ab9..d243ef514d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -22,6 +22,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.SetCache; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.task.Task; @@ -41,7 +43,12 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public abstract class TaskProcessor, R extends TaskResult> { @@ -56,9 +63,10 @@ public abstract class TaskProcessor, R extends TaskResult> { private QueueKey queueKey; private MainQueueConsumerManager, QueueConfig> taskConsumer; + private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getJobType().name().toLowerCase() + "-task-processor")); - private final Set deletedTenants = ConcurrentHashMap.newKeySet(); - private final Set discardedJobs = ConcurrentHashMap.newKeySet(); // fixme use caffeine + private final SetCache discardedJobs = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); + private final SetCache deletedTenants = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); @PostConstruct public void init() { @@ -124,21 +132,34 @@ public abstract class TaskProcessor, R extends TaskResult> { consumer.commit(); } - private void processTask(T task) throws Exception { // todo: timeout and task interruption + private void processTask(T task) throws InterruptedException { task.setAttempt(task.getAttempt() + 1); log.info("Processing task: {}", task); + Future future = null; try { - R result = process(task); + future = taskExecutor.submit(() -> process(task)); + R result; + try { + result = future.get(getTaskProcessingTimeout(), TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } catch (TimeoutException e) { + throw new TimeoutException("Timeout after " + getTaskProcessingTimeout() + " ms"); + } reportTaskResult(task, result); } catch (InterruptedException e) { throw e; - } catch (Exception e) { + } catch (Throwable e) { log.error("Failed to process task (attempt {}): {}", task.getAttempt(), task, e); if (task.getAttempt() <= task.getRetries()) { processTask(task); } else { reportTaskFailure(task, e); } + } finally { + if (future != null && !future.isDone()) { + future.cancel(true); + } } } @@ -166,8 +187,10 @@ public abstract class TaskProcessor, R extends TaskResult> { public void destroy() { taskConsumer.stop(); taskConsumer.awaitStop(); + taskExecutor.shutdownNow(); } + public abstract long getTaskProcessingTimeout(); public abstract JobType getJobType(); diff --git a/common/util/pom.xml b/common/util/pom.xml index 6719dc628a..f69cf794e9 100644 --- a/common/util/pom.xml +++ b/common/util/pom.xml @@ -116,6 +116,10 @@ exp4j ${exp4j.version} + + com.github.ben-manes.caffeine + caffeine + diff --git a/common/util/src/main/java/org/thingsboard/common/util/SetCache.java b/common/util/src/main/java/org/thingsboard/common/util/SetCache.java new file mode 100644 index 0000000000..9676434534 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/SetCache.java @@ -0,0 +1,47 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +import java.util.concurrent.TimeUnit; + +public class SetCache { + + private static final Object DUMMY_VALUE = Boolean.TRUE; + + private final Cache cache; + + public SetCache(long valueTtlMs) { + this.cache = Caffeine.newBuilder() + .expireAfterWrite(valueTtlMs, TimeUnit.MILLISECONDS) + .build(); + } + + public void add(K key) { + cache.put(key, DUMMY_VALUE); + } + + public boolean contains(K key) { + return cache.asMap().containsKey(key); + } + + public void remove(K key) { + cache.invalidate(key); + } + +}