diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java index 365413c757..2ced33a90b 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java @@ -29,6 +29,7 @@ import org.thingsboard.server.kafka.*; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -67,7 +68,7 @@ public class RemoteTransportApiService { @PostConstruct public void init() { - this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); + this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder responseBuilder = TBKafkaProducerTemplate.builder(); responseBuilder.settings(kafkaSettings); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java index 3137c03aeb..938584460f 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -277,7 +278,7 @@ public abstract class AbstractTransportService implements TransportService { new TbRateLimits(perDevicesLimitsConf); } this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(); - this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); + this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index 06d3616403..b204a44302 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -31,7 +31,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -70,7 +72,7 @@ public abstract class AbstractBufferedRateExecutor(queueLimit); this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads); - this.callbackExecutor = Executors.newFixedThreadPool(callbackThreads); + this.callbackExecutor = new ThreadPoolExecutor(callbackThreads, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); this.perTenantLimitsEnabled = perTenantLimitsEnabled; this.perTenantLimitsConfiguration = perTenantLimitsConfiguration;