diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java index 28deb5b4da..7a5ca407af 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java @@ -16,6 +16,7 @@ package org.thingsboard.rule.engine.rest; import io.netty.handler.ssl.SslContext; +import io.netty.handler.timeout.ReadTimeoutHandler; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.binary.Base64; @@ -48,6 +49,7 @@ import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; @@ -63,6 +65,19 @@ public class TbHttpClient { private static final String ERROR_BODY = "error_body"; private static final String ERROR_SYSTEM_PROPERTIES = "Didn't set any system proxy properties. Should be added next system proxy properties: \"http.proxyHost\" and \"http.proxyPort\" or \"https.proxyHost\" and \"https.proxyPort\" or \"socksProxyHost\" and \"socksProxyPort\""; + private static final String HTTP_PROXY_HOST = "http.proxyHost"; + private static final String HTTP_PROXY_PORT = "http.proxyPort"; + private static final String HTTPS_PROXY_HOST = "https.proxyHost"; + private static final String HTTPS_PROXY_PORT = "https.proxyPort"; + + private static final String SOCKS_PROXY_HOST = "socksProxyHost"; + private static final String SOCKS_PROXY_PORT = "socksProxyPort"; + private static final String SOCKS_VERSION = "socksProxyVersion"; + private static final String SOCKS_VERSION_5 = "5"; + private static final String SOCKS_VERSION_4 = "4"; + public static final String PROXY_USER = "tb.proxy.user"; + public static final String PROXY_PASSWORD = "tb.proxy.password"; + private final TbRestApiCallNodeConfiguration config; private WebClient webClient; @@ -75,29 +90,29 @@ public class TbHttpClient { semaphore = new Semaphore(config.getMaxParallelRequestsCount()); } - HttpClient httpClient = HttpClient.create(); + HttpClient httpClient = HttpClient.create() + .doOnConnected(c -> + c.addHandlerLast(new ReadTimeoutHandler(config.getReadTimeoutMs(), TimeUnit.MILLISECONDS))); if (config.isEnableProxy()) { - checkProxyHost(config.getProxyHost()); - checkProxyPort(config.getProxyPort()); - if (config.isUseSystemProxyProperties()) { checkSystemProxyProperties(); - //TODO: maybe we should replace it and manually get all system props - httpClient = httpClient.proxyWithSystemProperties(); + httpClient = httpClient.proxy(this::createSystemProxyProvider); } else { checkProxyHost(config.getProxyHost()); checkProxyPort(config.getProxyPort()); - String proxyUser = config.getProxyUser(); String proxyPassword = config.getProxyPassword(); - httpClient = httpClient.proxy(options -> - options.type(ProxyProvider.Proxy.HTTP) - .host(config.getProxyHost()) - .port(config.getProxyPort()) - .username(proxyUser) - .password(u -> proxyPassword)); + httpClient = httpClient.proxy(options -> { + var o = options.type(ProxyProvider.Proxy.HTTP) + .host(config.getProxyHost()) + .port(config.getProxyPort()); + + if (useAuth(proxyUser, proxyPassword)) { + o.username(proxyUser).password(u -> proxyPassword); + } + }); } } else if (!config.isUseSimpleClientHttpFactory()) { if (CredentialsType.CERT_PEM == config.getCredentials().getType()) { @@ -135,8 +150,9 @@ public class TbHttpClient { public void processMessage(TbContext ctx, TbMsg msg) { try { - if (semaphore != null) { - semaphore.tryAcquire(config.getReadTimeoutMs(), TimeUnit.MILLISECONDS); + if (semaphore != null && !semaphore.tryAcquire(config.getReadTimeoutMs(), TimeUnit.MILLISECONDS)) { + ctx.tellFailure(msg, new RuntimeException("Timeout during waiting for reply!")); + return; } String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg); @@ -171,7 +187,7 @@ public class TbHttpClient { } }, throwable -> { if (semaphore != null) { - semaphore.release(); // Make sure to release in case of error + semaphore.release(); } TbMsg next = processException(ctx, msg, throwable); @@ -276,15 +292,81 @@ public class TbHttpClient { } } - private static void checkProxyHost(String proxyHost) throws TbNodeException { + private static void checkProxyHost(String proxyHost) { if (StringUtils.isEmpty(proxyHost)) { - throw new TbNodeException("Proxy host can't be empty"); + throw new IllegalArgumentException("Proxy host can't be empty"); } } - private static void checkProxyPort(int proxyPort) throws TbNodeException { + private static void checkProxyPort(int proxyPort) { if (proxyPort < 0 || proxyPort > 65535) { - throw new TbNodeException("Proxy port out of range:" + proxyPort); + throw new IllegalArgumentException("Proxy port out of range:" + proxyPort); + } + } + + private void createSystemProxyProvider(ProxyProvider.TypeSpec option) { + Properties properties = System.getProperties(); + if (properties.containsKey(HTTP_PROXY_HOST) || properties.containsKey(HTTPS_PROXY_HOST)) { + createHttpProxyFrom(option, properties); + } + if (properties.containsKey(SOCKS_PROXY_HOST)) { + createSocksProxyFrom(option, properties); + } + } + + private void createHttpProxyFrom(ProxyProvider.TypeSpec option, Properties properties) { + String hostProperty; + String portProperty; + if (properties.containsKey(HTTPS_PROXY_HOST)) { + hostProperty = HTTPS_PROXY_HOST; + portProperty = HTTPS_PROXY_PORT; + } else { + hostProperty = HTTP_PROXY_HOST; + portProperty = HTTP_PROXY_PORT; + } + + String hostname = properties.getProperty(hostProperty); + int port = Integer.parseInt(properties.getProperty(portProperty)); + + checkProxyHost(config.getProxyHost()); + checkProxyPort(config.getProxyPort()); + + var proxy = option + .type(ProxyProvider.Proxy.HTTP) + .host(hostname) + .port(port); + + var proxyUser = properties.getProperty(PROXY_USER); + var proxyPassword = properties.getProperty(PROXY_PASSWORD); + + if (useAuth(proxyUser, proxyPassword)) { + proxy.username(proxyUser).password(u -> proxyPassword); + } + } + + private void createSocksProxyFrom(ProxyProvider.TypeSpec option, Properties properties) { + String hostname = properties.getProperty(SOCKS_PROXY_HOST); + String version = properties.getProperty(SOCKS_VERSION, SOCKS_VERSION_5); + if (!SOCKS_VERSION_5.equals(version) && !SOCKS_VERSION_4.equals(version)) { + throw new IllegalArgumentException(String.format("Wrong socks version %s! Supported only socks versions 4 and 5.", version)); + } + + ProxyProvider.Proxy type = SOCKS_VERSION_5.equals(version) ? ProxyProvider.Proxy.SOCKS5 : ProxyProvider.Proxy.SOCKS4; + int port = Integer.parseInt(properties.getProperty(SOCKS_PROXY_PORT)); + + checkProxyHost(config.getProxyHost()); + checkProxyPort(config.getProxyPort()); + + ProxyProvider.Builder proxy = option + .type(type) + .host(hostname) + .port(port); + + var proxyUser = properties.getProperty(PROXY_USER); + var proxyPassword = properties.getProperty(PROXY_PASSWORD); + + if (useAuth(proxyUser, proxyPassword)) { + proxy.username(proxyUser).password(u -> proxyPassword); } }