diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java index d7476bf08e..566430bd8d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java @@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import reactor.netty.http.client.HttpClient; +import reactor.netty.resources.ConnectionProvider; import reactor.netty.transport.ProxyProvider; import javax.net.ssl.SSLException; @@ -95,7 +96,12 @@ public class TbHttpClient { semaphore = new Semaphore(config.getMaxParallelRequestsCount()); } - HttpClient httpClient = HttpClient.create() + ConnectionProvider connectionProvider = ConnectionProvider + .builder("rule-engine-http-client") + .maxConnections(getPoolMaxConnections()) + .build(); + + HttpClient httpClient = HttpClient.create(connectionProvider) .runOn(getSharedOrCreateEventLoopGroup(eventLoopGroupShared)) .doOnConnected(c -> c.addHandlerLast(new ReadTimeoutHandler(config.getReadTimeoutMs(), TimeUnit.MILLISECONDS))); @@ -143,6 +149,18 @@ public class TbHttpClient { } } + private int getPoolMaxConnections() { + String poolMaxConnectionsEnv = System.getenv("TB_RE_HTTP_CLIENT_POOL_MAX_CONNECTIONS"); + + int poolMaxConnections; + if (poolMaxConnectionsEnv != null) { + poolMaxConnections = Integer.parseInt(poolMaxConnectionsEnv); + } else { + poolMaxConnections = ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS; + } + return poolMaxConnections; + } + private void validateMaxInMemoryBufferSize(TbRestApiCallNodeConfiguration config) throws TbNodeException { int systemMaxInMemoryBufferSizeInKb = 25000; try {