refactored HttpClient proxy
This commit is contained in:
parent
7f9a9c7fb3
commit
ae2fd08c36
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.rule.engine.rest;
|
||||
|
||||
import io.netty.handler.ssl.SslContext;
|
||||
import io.netty.handler.timeout.ReadTimeoutHandler;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
@ -48,6 +49,7 @@ import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
@ -63,6 +65,19 @@ public class TbHttpClient {
|
||||
private static final String ERROR_BODY = "error_body";
|
||||
private static final String ERROR_SYSTEM_PROPERTIES = "Didn't set any system proxy properties. Should be added next system proxy properties: \"http.proxyHost\" and \"http.proxyPort\" or \"https.proxyHost\" and \"https.proxyPort\" or \"socksProxyHost\" and \"socksProxyPort\"";
|
||||
|
||||
private static final String HTTP_PROXY_HOST = "http.proxyHost";
|
||||
private static final String HTTP_PROXY_PORT = "http.proxyPort";
|
||||
private static final String HTTPS_PROXY_HOST = "https.proxyHost";
|
||||
private static final String HTTPS_PROXY_PORT = "https.proxyPort";
|
||||
|
||||
private static final String SOCKS_PROXY_HOST = "socksProxyHost";
|
||||
private static final String SOCKS_PROXY_PORT = "socksProxyPort";
|
||||
private static final String SOCKS_VERSION = "socksProxyVersion";
|
||||
private static final String SOCKS_VERSION_5 = "5";
|
||||
private static final String SOCKS_VERSION_4 = "4";
|
||||
public static final String PROXY_USER = "tb.proxy.user";
|
||||
public static final String PROXY_PASSWORD = "tb.proxy.password";
|
||||
|
||||
private final TbRestApiCallNodeConfiguration config;
|
||||
|
||||
private WebClient webClient;
|
||||
@ -75,29 +90,29 @@ public class TbHttpClient {
|
||||
semaphore = new Semaphore(config.getMaxParallelRequestsCount());
|
||||
}
|
||||
|
||||
HttpClient httpClient = HttpClient.create();
|
||||
HttpClient httpClient = HttpClient.create()
|
||||
.doOnConnected(c ->
|
||||
c.addHandlerLast(new ReadTimeoutHandler(config.getReadTimeoutMs(), TimeUnit.MILLISECONDS)));
|
||||
|
||||
if (config.isEnableProxy()) {
|
||||
checkProxyHost(config.getProxyHost());
|
||||
checkProxyPort(config.getProxyPort());
|
||||
|
||||
if (config.isUseSystemProxyProperties()) {
|
||||
checkSystemProxyProperties();
|
||||
//TODO: maybe we should replace it and manually get all system props
|
||||
httpClient = httpClient.proxyWithSystemProperties();
|
||||
httpClient = httpClient.proxy(this::createSystemProxyProvider);
|
||||
} else {
|
||||
checkProxyHost(config.getProxyHost());
|
||||
checkProxyPort(config.getProxyPort());
|
||||
|
||||
String proxyUser = config.getProxyUser();
|
||||
String proxyPassword = config.getProxyPassword();
|
||||
|
||||
httpClient = httpClient.proxy(options ->
|
||||
options.type(ProxyProvider.Proxy.HTTP)
|
||||
.host(config.getProxyHost())
|
||||
.port(config.getProxyPort())
|
||||
.username(proxyUser)
|
||||
.password(u -> proxyPassword));
|
||||
httpClient = httpClient.proxy(options -> {
|
||||
var o = options.type(ProxyProvider.Proxy.HTTP)
|
||||
.host(config.getProxyHost())
|
||||
.port(config.getProxyPort());
|
||||
|
||||
if (useAuth(proxyUser, proxyPassword)) {
|
||||
o.username(proxyUser).password(u -> proxyPassword);
|
||||
}
|
||||
});
|
||||
}
|
||||
} else if (!config.isUseSimpleClientHttpFactory()) {
|
||||
if (CredentialsType.CERT_PEM == config.getCredentials().getType()) {
|
||||
@ -135,8 +150,9 @@ public class TbHttpClient {
|
||||
|
||||
public void processMessage(TbContext ctx, TbMsg msg) {
|
||||
try {
|
||||
if (semaphore != null) {
|
||||
semaphore.tryAcquire(config.getReadTimeoutMs(), TimeUnit.MILLISECONDS);
|
||||
if (semaphore != null && !semaphore.tryAcquire(config.getReadTimeoutMs(), TimeUnit.MILLISECONDS)) {
|
||||
ctx.tellFailure(msg, new RuntimeException("Timeout during waiting for reply!"));
|
||||
return;
|
||||
}
|
||||
|
||||
String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg);
|
||||
@ -171,7 +187,7 @@ public class TbHttpClient {
|
||||
}
|
||||
}, throwable -> {
|
||||
if (semaphore != null) {
|
||||
semaphore.release(); // Make sure to release in case of error
|
||||
semaphore.release();
|
||||
}
|
||||
|
||||
TbMsg next = processException(ctx, msg, throwable);
|
||||
@ -276,15 +292,81 @@ public class TbHttpClient {
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkProxyHost(String proxyHost) throws TbNodeException {
|
||||
private static void checkProxyHost(String proxyHost) {
|
||||
if (StringUtils.isEmpty(proxyHost)) {
|
||||
throw new TbNodeException("Proxy host can't be empty");
|
||||
throw new IllegalArgumentException("Proxy host can't be empty");
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkProxyPort(int proxyPort) throws TbNodeException {
|
||||
private static void checkProxyPort(int proxyPort) {
|
||||
if (proxyPort < 0 || proxyPort > 65535) {
|
||||
throw new TbNodeException("Proxy port out of range:" + proxyPort);
|
||||
throw new IllegalArgumentException("Proxy port out of range:" + proxyPort);
|
||||
}
|
||||
}
|
||||
|
||||
private void createSystemProxyProvider(ProxyProvider.TypeSpec option) {
|
||||
Properties properties = System.getProperties();
|
||||
if (properties.containsKey(HTTP_PROXY_HOST) || properties.containsKey(HTTPS_PROXY_HOST)) {
|
||||
createHttpProxyFrom(option, properties);
|
||||
}
|
||||
if (properties.containsKey(SOCKS_PROXY_HOST)) {
|
||||
createSocksProxyFrom(option, properties);
|
||||
}
|
||||
}
|
||||
|
||||
private void createHttpProxyFrom(ProxyProvider.TypeSpec option, Properties properties) {
|
||||
String hostProperty;
|
||||
String portProperty;
|
||||
if (properties.containsKey(HTTPS_PROXY_HOST)) {
|
||||
hostProperty = HTTPS_PROXY_HOST;
|
||||
portProperty = HTTPS_PROXY_PORT;
|
||||
} else {
|
||||
hostProperty = HTTP_PROXY_HOST;
|
||||
portProperty = HTTP_PROXY_PORT;
|
||||
}
|
||||
|
||||
String hostname = properties.getProperty(hostProperty);
|
||||
int port = Integer.parseInt(properties.getProperty(portProperty));
|
||||
|
||||
checkProxyHost(config.getProxyHost());
|
||||
checkProxyPort(config.getProxyPort());
|
||||
|
||||
var proxy = option
|
||||
.type(ProxyProvider.Proxy.HTTP)
|
||||
.host(hostname)
|
||||
.port(port);
|
||||
|
||||
var proxyUser = properties.getProperty(PROXY_USER);
|
||||
var proxyPassword = properties.getProperty(PROXY_PASSWORD);
|
||||
|
||||
if (useAuth(proxyUser, proxyPassword)) {
|
||||
proxy.username(proxyUser).password(u -> proxyPassword);
|
||||
}
|
||||
}
|
||||
|
||||
private void createSocksProxyFrom(ProxyProvider.TypeSpec option, Properties properties) {
|
||||
String hostname = properties.getProperty(SOCKS_PROXY_HOST);
|
||||
String version = properties.getProperty(SOCKS_VERSION, SOCKS_VERSION_5);
|
||||
if (!SOCKS_VERSION_5.equals(version) && !SOCKS_VERSION_4.equals(version)) {
|
||||
throw new IllegalArgumentException(String.format("Wrong socks version %s! Supported only socks versions 4 and 5.", version));
|
||||
}
|
||||
|
||||
ProxyProvider.Proxy type = SOCKS_VERSION_5.equals(version) ? ProxyProvider.Proxy.SOCKS5 : ProxyProvider.Proxy.SOCKS4;
|
||||
int port = Integer.parseInt(properties.getProperty(SOCKS_PROXY_PORT));
|
||||
|
||||
checkProxyHost(config.getProxyHost());
|
||||
checkProxyPort(config.getProxyPort());
|
||||
|
||||
ProxyProvider.Builder proxy = option
|
||||
.type(type)
|
||||
.host(hostname)
|
||||
.port(port);
|
||||
|
||||
var proxyUser = properties.getProperty(PROXY_USER);
|
||||
var proxyPassword = properties.getProperty(PROXY_PASSWORD);
|
||||
|
||||
if (useAuth(proxyUser, proxyPassword)) {
|
||||
proxy.username(proxyUser).password(u -> proxyPassword);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user