diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbCallback.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbCallback.java index ee8990d931..777ce3bf94 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbCallback.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbCallback.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.msg.queue; +import com.google.common.util.concurrent.SettableFuture; import org.thingsboard.server.common.data.id.EntityId; import java.util.UUID; @@ -34,7 +35,7 @@ public interface TbCallback { } }; - default UUID getId(){ + default UUID getId() { return EntityId.NULL_UUID; } @@ -42,4 +43,18 @@ public interface TbCallback { void onFailure(Throwable t); + static TbCallback wrap(SettableFuture future) { + return new TbCallback() { + @Override + public void onSuccess() { + future.set(null); + } + + @Override + public void onFailure(Throwable t) { + future.setException(t); + } + }; + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index d243ef514d..30a9929dfb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -183,6 +183,15 @@ public abstract class TaskProcessor, R extends TaskResult> { discardedJobs.add(jobId); } + protected V wait(Future future) throws Exception { + try { + return future.get(); // will be interrupted after task processing timeout + } catch (InterruptedException e) { + future.cancel(true); // interrupting the underlying task + throw e; + } + } + @PreDestroy public void destroy() { taskConsumer.stop();