TbHttpClient for Rest API call node: implemented shared event loop for netty for any rest api call node instance

This commit is contained in:
Sergey Matvienko 2021-09-16 14:34:24 +03:00 committed by Andrew Shvayka
parent 177c0f46ad
commit fe511f080e
3 changed files with 71 additions and 4 deletions

View File

@ -78,7 +78,7 @@ public class TbHttpClient {
private AsyncRestTemplate httpClient; private AsyncRestTemplate httpClient;
private Deque<ListenableFuture<ResponseEntity<String>>> pendingFutures; private Deque<ListenableFuture<ResponseEntity<String>>> pendingFutures;
TbHttpClient(TbRestApiCallNodeConfiguration config) throws TbNodeException { TbHttpClient(TbRestApiCallNodeConfiguration config, EventLoopGroup eventLoopGroupShared) throws TbNodeException {
try { try {
this.config = config; this.config = config;
if (config.getMaxParallelRequestsCount() > 0) { if (config.getMaxParallelRequestsCount() > 0) {
@ -139,8 +139,7 @@ public class TbHttpClient {
} }
httpClient = new AsyncRestTemplate(); httpClient = new AsyncRestTemplate();
} else { } else {
this.eventLoopGroup = new NioEventLoopGroup(); Netty4ClientHttpRequestFactory nettyFactory = new Netty4ClientHttpRequestFactory(getSharedOrCreateEventLoopGroup(eventLoopGroupShared));
Netty4ClientHttpRequestFactory nettyFactory = new Netty4ClientHttpRequestFactory(this.eventLoopGroup);
nettyFactory.setSslContext(config.getCredentials().initSslContext()); nettyFactory.setSslContext(config.getCredentials().initSslContext());
nettyFactory.setReadTimeout(config.getReadTimeoutMs()); nettyFactory.setReadTimeout(config.getReadTimeoutMs());
httpClient = new AsyncRestTemplate(nettyFactory); httpClient = new AsyncRestTemplate(nettyFactory);
@ -150,6 +149,13 @@ public class TbHttpClient {
} }
} }
EventLoopGroup getSharedOrCreateEventLoopGroup(EventLoopGroup eventLoopGroupShared) {
if (eventLoopGroupShared != null) {
return eventLoopGroupShared;
}
return this.eventLoopGroup = new NioEventLoopGroup();
}
private void checkSystemProxyProperties() throws TbNodeException { private void checkSystemProxyProperties() throws TbNodeException {
boolean useHttpProxy = !StringUtils.isEmpty(System.getProperty("http.proxyHost")) && !StringUtils.isEmpty(System.getProperty("http.proxyPort")); boolean useHttpProxy = !StringUtils.isEmpty(System.getProperty("http.proxyHost")) && !StringUtils.isEmpty(System.getProperty("http.proxyPort"));
boolean useHttpsProxy = !StringUtils.isEmpty(System.getProperty("https.proxyHost")) && !StringUtils.isEmpty(System.getProperty("https.proxyPort")); boolean useHttpsProxy = !StringUtils.isEmpty(System.getProperty("https.proxyHost")) && !StringUtils.isEmpty(System.getProperty("https.proxyPort"));

View File

@ -51,7 +51,7 @@ public class TbRestApiCallNode implements TbNode {
@Override @Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
TbRestApiCallNodeConfiguration config = TbNodeUtils.convert(configuration, TbRestApiCallNodeConfiguration.class); TbRestApiCallNodeConfiguration config = TbNodeUtils.convert(configuration, TbRestApiCallNodeConfiguration.class);
httpClient = new TbHttpClient(config); httpClient = new TbHttpClient(config, ctx.getSharedEventLoop());
useRedisQueueForMsgPersistence = config.isUseRedisQueueForMsgPersistence(); useRedisQueueForMsgPersistence = config.isUseRedisQueueForMsgPersistence();
if (useRedisQueueForMsgPersistence) { if (useRedisQueueForMsgPersistence) {
log.warn("[{}][{}] Usage of Redis Template is deprecated starting 2.5 and will have no affect", ctx.getTenantId(), ctx.getSelfId()); log.warn("[{}][{}] Usage of Redis Template is deprecated starting 2.5 and will have no affect", ctx.getTenantId(), ctx.getSelfId());

View File

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rest;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.willCallRealMethod;
import static org.mockito.Mockito.mock;
public class TbHttpClientTest {
EventLoopGroup eventLoop;
TbHttpClient client;
@Before
public void setUp() throws Exception {
client = mock(TbHttpClient.class);
willCallRealMethod().given(client).getSharedOrCreateEventLoopGroup(any());
}
@After
public void tearDown() throws Exception {
if (eventLoop != null) {
eventLoop.shutdownGracefully();
}
}
@Test
public void givenSharedEventLoop_whenGetEventLoop_ThenReturnShared() {
eventLoop = mock(EventLoopGroup.class);
assertThat(client.getSharedOrCreateEventLoopGroup(eventLoop), is(eventLoop));
}
@Test
public void givenNull_whenGetEventLoop_ThenReturnShared() {
eventLoop = client.getSharedOrCreateEventLoopGroup(null);
assertThat(eventLoop, instanceOf(NioEventLoopGroup.class));
}
}