diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index ffd06c0bf4..2952cfa61b 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -96,6 +96,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { builder.maxRequestTimeout(maxRequestsTimeout); builder.pollInterval(responsePollDuration); kafkaTemplate = builder.build(); + kafkaTemplate.init(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java index dd1a47a693..a851e6bc80 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java @@ -48,6 +48,7 @@ public class TbKafkaRequestTemplate { private final TBKafkaProducerTemplate requestTemplate; private final TBKafkaConsumerTemplate responseTemplate; private final ConcurrentMap> pendingRequests; + private final boolean internalExecutor; private final ExecutorService executor; private final long maxRequestTimeout; private final long maxPendingRequests; @@ -69,8 +70,10 @@ public class TbKafkaRequestTemplate { this.maxPendingRequests = maxPendingRequests; this.pollInterval = pollInterval; if (executor != null) { + internalExecutor = false; this.executor = executor; } else { + internalExecutor = true; this.executor = Executors.newSingleThreadExecutor(); } } @@ -126,6 +129,9 @@ public class TbKafkaRequestTemplate { public void stop() { stopped = true; + if (internalExecutor) { + executor.shutdownNow(); + } } public ListenableFuture post(String key, Request request) {