Task processing timeout

This commit is contained in:
ViacheslavKlimov 2025-05-02 17:43:14 +03:00
parent 4c01b3d70a
commit ac9e738018
6 changed files with 108 additions and 6 deletions

View File

@ -165,6 +165,7 @@ public class HousekeeperService {
private void stop() { private void stop() {
consumer.stop(); consumer.stop();
consumerExecutor.shutdownNow(); consumerExecutor.shutdownNow();
taskExecutor.shutdownNow();
log.info("Stopped Housekeeper service"); log.info("Stopped Housekeeper service");
} }

View File

@ -39,6 +39,11 @@ public class DummyTaskProcessor extends TaskProcessor<DummyTask, DummyTaskResult
return DummyTaskResult.success(); return DummyTaskResult.success();
} }
@Override
public long getTaskProcessingTimeout() {
return 2000;
}
@Override @Override
public JobType getJobType() { public JobType getJobType() {
return JobType.DUMMY; return JobType.DUMMY;

View File

@ -149,6 +149,28 @@ public class JobManagerTest extends AbstractControllerTest {
}); });
} }
@Test
public void testSubmitJob_taskTimeout() {
JobId jobId = jobManager.submitJob(Job.builder()
.tenantId(tenantId)
.type(JobType.DUMMY)
.key("test-job")
.description("Test job")
.configuration(DummyJobConfiguration.builder()
.successfulTasksCount(1)
.taskProcessingTimeMs(5000) // bigger than DummyTaskProcessor.getTaskProcessingTimeout()
.build())
.build()).getId();
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
Job job = findJobById(jobId);
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED);
JobResult jobResult = job.getResult();
assertThat(jobResult.getFailedCount()).isEqualTo(1);
assertThat(((DummyTaskResult) jobResult.getResults().get(0)).getFailure().getError()).isEqualTo("Timeout after 2000 ms"); // last error
});
}
@Test @Test
public void testCancelJob_whileRunning() throws Exception { public void testCancelJob_whileRunning() throws Exception {
int tasksCount = 100; int tasksCount = 100;

View File

@ -22,6 +22,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.SetCache;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.task.Task; import org.thingsboard.server.common.data.job.task.Task;
@ -41,7 +43,12 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> { public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
@ -56,9 +63,10 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
private QueueKey queueKey; private QueueKey queueKey;
private MainQueueConsumerManager<TbProtoQueueMsg<TaskProto>, QueueConfig> taskConsumer; private MainQueueConsumerManager<TbProtoQueueMsg<TaskProto>, QueueConfig> taskConsumer;
private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(getJobType().name().toLowerCase() + "-task-processor"));
private final Set<UUID> deletedTenants = ConcurrentHashMap.newKeySet(); private final SetCache<UUID> discardedJobs = new SetCache<>(TimeUnit.MINUTES.toMillis(60));
private final Set<UUID> discardedJobs = ConcurrentHashMap.newKeySet(); // fixme use caffeine private final SetCache<UUID> deletedTenants = new SetCache<>(TimeUnit.MINUTES.toMillis(60));
@PostConstruct @PostConstruct
public void init() { public void init() {
@ -124,21 +132,34 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
consumer.commit(); consumer.commit();
} }
private void processTask(T task) throws Exception { // todo: timeout and task interruption private void processTask(T task) throws InterruptedException {
task.setAttempt(task.getAttempt() + 1); task.setAttempt(task.getAttempt() + 1);
log.info("Processing task: {}", task); log.info("Processing task: {}", task);
Future<R> future = null;
try { try {
R result = process(task); future = taskExecutor.submit(() -> process(task));
R result;
try {
result = future.get(getTaskProcessingTimeout(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw e.getCause();
} catch (TimeoutException e) {
throw new TimeoutException("Timeout after " + getTaskProcessingTimeout() + " ms");
}
reportTaskResult(task, result); reportTaskResult(task, result);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw e; throw e;
} catch (Exception e) { } catch (Throwable e) {
log.error("Failed to process task (attempt {}): {}", task.getAttempt(), task, e); log.error("Failed to process task (attempt {}): {}", task.getAttempt(), task, e);
if (task.getAttempt() <= task.getRetries()) { if (task.getAttempt() <= task.getRetries()) {
processTask(task); processTask(task);
} else { } else {
reportTaskFailure(task, e); reportTaskFailure(task, e);
} }
} finally {
if (future != null && !future.isDone()) {
future.cancel(true);
}
} }
} }
@ -166,8 +187,10 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
public void destroy() { public void destroy() {
taskConsumer.stop(); taskConsumer.stop();
taskConsumer.awaitStop(); taskConsumer.awaitStop();
taskExecutor.shutdownNow();
} }
public abstract long getTaskProcessingTimeout();
public abstract JobType getJobType(); public abstract JobType getJobType();

View File

@ -116,6 +116,10 @@
<artifactId>exp4j</artifactId> <artifactId>exp4j</artifactId>
<version>${exp4j.version}</version> <version>${exp4j.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,47 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
public class SetCache<K> {
private static final Object DUMMY_VALUE = Boolean.TRUE;
private final Cache<K, Object> cache;
public SetCache(long valueTtlMs) {
this.cache = Caffeine.newBuilder()
.expireAfterWrite(valueTtlMs, TimeUnit.MILLISECONDS)
.build();
}
public void add(K key) {
cache.put(key, DUMMY_VALUE);
}
public boolean contains(K key) {
return cache.asMap().containsKey(key);
}
public void remove(K key) {
cache.invalidate(key);
}
}