rest api call node added redis queue support (#2112)
* rest api call node added redis queue support * rest api node refactoring
This commit is contained in:
parent
a2c83820c3
commit
18041c34c7
@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.rule.engine.api.MailService;
|
||||
import org.thingsboard.rule.engine.api.RuleChainTransactionService;
|
||||
@ -336,6 +337,10 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private CassandraBufferedRateExecutor cassandraBufferedRateExecutor;
|
||||
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
public ActorSystemContext() {
|
||||
config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load());
|
||||
}
|
||||
|
||||
@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.thingsboard.rule.engine.api.ListeningExecutor;
|
||||
import org.thingsboard.rule.engine.api.MailService;
|
||||
@ -60,7 +61,6 @@ import org.thingsboard.server.dao.customer.CustomerService;
|
||||
import org.thingsboard.server.dao.dashboard.DashboardService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||
import org.thingsboard.server.dao.nosql.CassandraBufferedRateExecutor;
|
||||
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
|
||||
import org.thingsboard.server.dao.relation.RelationService;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
@ -361,6 +361,16 @@ class DefaultTbContext implements TbContext {
|
||||
return mainCtx.getCassandraBufferedRateExecutor().submit(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisTemplate<String, Object> getRedisTemplate() {
|
||||
return mainCtx.getRedisTemplate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getServerAddress() {
|
||||
return mainCtx.getServerAddress();
|
||||
}
|
||||
|
||||
private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) {
|
||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||
metaData.putValue("ruleNodeId", ruleNodeId.toString());
|
||||
|
||||
@ -67,7 +67,7 @@ public final class TbMsg implements Serializable {
|
||||
this(id, type, originator, metaData, dataType, data, new TbMsgTransactionData(id, originator), ruleChainId, ruleNodeId, clusterPartition);
|
||||
}
|
||||
|
||||
public static ByteBuffer toBytes(TbMsg msg) {
|
||||
public static byte[] toByteArray(TbMsg msg) {
|
||||
MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
|
||||
builder.setId(msg.getId().toString());
|
||||
builder.setType(msg.getType());
|
||||
@ -101,8 +101,16 @@ public final class TbMsg implements Serializable {
|
||||
|
||||
builder.setDataType(msg.getDataType().ordinal());
|
||||
builder.setData(msg.getData());
|
||||
byte[] bytes = builder.build().toByteArray();
|
||||
return ByteBuffer.wrap(bytes);
|
||||
return builder.build().toByteArray();
|
||||
|
||||
}
|
||||
|
||||
public static ByteBuffer toBytes(TbMsg msg) {
|
||||
return ByteBuffer.wrap(toByteArray(msg));
|
||||
}
|
||||
|
||||
public static TbMsg fromBytes(byte[] data) {
|
||||
return fromBytes(ByteBuffer.wrap(data));
|
||||
}
|
||||
|
||||
public static TbMsg fromBytes(ByteBuffer buffer) {
|
||||
@ -115,8 +123,8 @@ public final class TbMsg implements Serializable {
|
||||
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
||||
RuleChainId ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB()));
|
||||
RuleNodeId ruleNodeId = null;
|
||||
if(proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) {
|
||||
ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
|
||||
if (proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) {
|
||||
ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
|
||||
}
|
||||
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
|
||||
return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), transactionData, ruleChainId, ruleNodeId, proto.getClusterPartition());
|
||||
|
||||
@ -28,6 +28,7 @@ import org.springframework.data.redis.cache.RedisCacheConfiguration;
|
||||
import org.springframework.data.redis.cache.RedisCacheManager;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.format.support.DefaultFormattingConversionService;
|
||||
import org.springframework.util.Assert;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
@ -93,6 +94,13 @@ public abstract class TBRedisCacheConfiguration {
|
||||
return new PreviousDeviceCredentialsIdKeyGenerator();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RedisTemplate<String, Object> redisTemplate() {
|
||||
RedisTemplate<String, Object> template = new RedisTemplate<>();
|
||||
template.setConnectionFactory(redisConnectionFactory());
|
||||
return template;
|
||||
}
|
||||
|
||||
private static void registerDefaultConverters(ConverterRegistry registry) {
|
||||
Assert.notNull(registry, "ConverterRegistry must not be null!");
|
||||
registry.addConverter(EntityId.class, String.class, EntityId::toString);
|
||||
|
||||
@ -83,5 +83,10 @@
|
||||
<artifactId>cassandra-driver-extras</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-redis</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@ -15,7 +15,9 @@
|
||||
*/
|
||||
package org.thingsboard.rule.engine.api;
|
||||
|
||||
import com.datastax.driver.core.ResultSetFuture;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.thingsboard.server.common.data.Customer;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
@ -41,8 +43,6 @@ import org.thingsboard.server.dao.tenant.TenantService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.dao.user.UserService;
|
||||
|
||||
import com.datastax.driver.core.ResultSetFuture;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
@ -132,4 +132,7 @@ public interface TbContext {
|
||||
|
||||
ResultSetFuture submitCassandraTask(CassandraStatementTask task);
|
||||
|
||||
RedisTemplate<String, Object> getRedisTemplate();
|
||||
|
||||
String getServerAddress();
|
||||
}
|
||||
|
||||
@ -0,0 +1,153 @@
|
||||
/**
|
||||
* Copyright © 2016-2019 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.rest;
|
||||
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.handler.ssl.SslContextBuilder;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
import org.springframework.web.client.AsyncRestTemplate;
|
||||
import org.springframework.web.client.HttpClientErrorException;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.rule.engine.api.TbRelationTypes;
|
||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
|
||||
import javax.net.ssl.SSLException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Data
|
||||
@Slf4j
|
||||
class TbHttpClient {
|
||||
|
||||
private static final String STATUS = "status";
|
||||
private static final String STATUS_CODE = "statusCode";
|
||||
private static final String STATUS_REASON = "statusReason";
|
||||
private static final String ERROR = "error";
|
||||
private static final String ERROR_BODY = "error_body";
|
||||
|
||||
private final TbRestApiCallNodeConfiguration config;
|
||||
private final boolean useRedisQueueForMsgPersistence;
|
||||
|
||||
private EventLoopGroup eventLoopGroup;
|
||||
private AsyncRestTemplate httpClient;
|
||||
|
||||
TbHttpClient(TbRestApiCallNodeConfiguration config) throws TbNodeException {
|
||||
try {
|
||||
this.config = config;
|
||||
this.useRedisQueueForMsgPersistence = config.isUseRedisQueueForMsgPersistence();
|
||||
if (config.isUseSimpleClientHttpFactory()) {
|
||||
httpClient = new AsyncRestTemplate();
|
||||
} else {
|
||||
this.eventLoopGroup = new NioEventLoopGroup();
|
||||
Netty4ClientHttpRequestFactory nettyFactory = new Netty4ClientHttpRequestFactory(this.eventLoopGroup);
|
||||
nettyFactory.setSslContext(SslContextBuilder.forClient().build());
|
||||
httpClient = new AsyncRestTemplate(nettyFactory);
|
||||
}
|
||||
} catch (SSLException e) {
|
||||
throw new TbNodeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
void destroy() {
|
||||
if (this.eventLoopGroup != null) {
|
||||
this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
void processMessage(TbContext ctx, TbMsg msg, TbRedisQueueProcessor queueProcessor) {
|
||||
String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg.getMetaData());
|
||||
HttpHeaders headers = prepareHeaders(msg.getMetaData());
|
||||
HttpMethod method = HttpMethod.valueOf(config.getRequestMethod());
|
||||
HttpEntity<String> entity = new HttpEntity<>(msg.getData(), headers);
|
||||
|
||||
ListenableFuture<ResponseEntity<String>> future = httpClient.exchange(
|
||||
endpointUrl, method, entity, String.class);
|
||||
future.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
if (useRedisQueueForMsgPersistence) {
|
||||
queueProcessor.pushOnFailure(msg);
|
||||
}
|
||||
TbMsg next = processException(ctx, msg, throwable);
|
||||
ctx.tellFailure(next, throwable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(ResponseEntity<String> responseEntity) {
|
||||
if (responseEntity.getStatusCode().is2xxSuccessful()) {
|
||||
if (useRedisQueueForMsgPersistence) {
|
||||
queueProcessor.resetCounter();
|
||||
}
|
||||
TbMsg next = processResponse(ctx, msg, responseEntity);
|
||||
ctx.tellNext(next, TbRelationTypes.SUCCESS);
|
||||
} else {
|
||||
if (useRedisQueueForMsgPersistence) {
|
||||
queueProcessor.pushOnFailure(msg);
|
||||
}
|
||||
TbMsg next = processFailureResponse(ctx, msg, responseEntity);
|
||||
ctx.tellNext(next, TbRelationTypes.FAILURE);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private TbMsg processResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
|
||||
TbMsgMetaData metaData = origMsg.getMetaData();
|
||||
metaData.putValue(STATUS, response.getStatusCode().name());
|
||||
metaData.putValue(STATUS_CODE, response.getStatusCode().value() + "");
|
||||
metaData.putValue(STATUS_REASON, response.getStatusCode().getReasonPhrase());
|
||||
response.getHeaders().toSingleValueMap().forEach(metaData::putValue);
|
||||
return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, response.getBody());
|
||||
}
|
||||
|
||||
private TbMsg processFailureResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
|
||||
TbMsgMetaData metaData = origMsg.getMetaData();
|
||||
metaData.putValue(STATUS, response.getStatusCode().name());
|
||||
metaData.putValue(STATUS_CODE, response.getStatusCode().value() + "");
|
||||
metaData.putValue(STATUS_REASON, response.getStatusCode().getReasonPhrase());
|
||||
metaData.putValue(ERROR_BODY, response.getBody());
|
||||
return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
|
||||
}
|
||||
|
||||
private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) {
|
||||
TbMsgMetaData metaData = origMsg.getMetaData();
|
||||
metaData.putValue(ERROR, e.getClass() + ": " + e.getMessage());
|
||||
if (e instanceof HttpClientErrorException) {
|
||||
HttpClientErrorException httpClientErrorException = (HttpClientErrorException) e;
|
||||
metaData.putValue(STATUS, httpClientErrorException.getStatusText());
|
||||
metaData.putValue(STATUS_CODE, httpClientErrorException.getRawStatusCode() + "");
|
||||
metaData.putValue(ERROR_BODY, httpClientErrorException.getResponseBodyAsString());
|
||||
}
|
||||
return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
|
||||
}
|
||||
|
||||
private HttpHeaders prepareHeaders(TbMsgMetaData metaData) {
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
config.getHeaders().forEach((k, v) -> headers.add(TbNodeUtils.processPattern(k, metaData), TbNodeUtils.processPattern(v, metaData)));
|
||||
return headers;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,125 @@
|
||||
/**
|
||||
* Copyright © 2016-2019 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.rest;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.ListOperations;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Data
|
||||
@Slf4j
|
||||
class TbRedisQueueProcessor {
|
||||
|
||||
private static final int MAX_QUEUE_SIZE = Integer.MAX_VALUE;
|
||||
|
||||
private final TbContext ctx;
|
||||
private final TbHttpClient httpClient;
|
||||
private final ExecutorService executor;
|
||||
private final ListOperations<String, Object> listOperations;
|
||||
private final String redisKey;
|
||||
private final boolean trimQueue;
|
||||
private final int maxQueueSize;
|
||||
|
||||
private AtomicInteger failuresCounter;
|
||||
private Future future;
|
||||
|
||||
TbRedisQueueProcessor(TbContext ctx, TbHttpClient httpClient, boolean trimQueue, int maxQueueSize) {
|
||||
this.ctx = ctx;
|
||||
this.httpClient = httpClient;
|
||||
this.executor = Executors.newSingleThreadExecutor();
|
||||
this.listOperations = ctx.getRedisTemplate().opsForList();
|
||||
this.redisKey = constructRedisKey();
|
||||
this.trimQueue = trimQueue;
|
||||
this.maxQueueSize = maxQueueSize;
|
||||
init();
|
||||
}
|
||||
|
||||
private void init() {
|
||||
failuresCounter = new AtomicInteger(0);
|
||||
future = executor.submit(() -> {
|
||||
while (true) {
|
||||
if (failuresCounter.get() != 0 && failuresCounter.get() % 50 == 0) {
|
||||
sleep("Target HTTP server is down...", 3);
|
||||
}
|
||||
if (listOperations.size(redisKey) > 0) {
|
||||
List<Object> list = listOperations.range(redisKey, -10, -1);
|
||||
list.forEach(obj -> {
|
||||
TbMsg msg = TbMsg.fromBytes((byte[]) obj);
|
||||
log.debug("Trying to send the message: {}", msg);
|
||||
listOperations.remove(redisKey, -1, obj);
|
||||
httpClient.processMessage(ctx, msg, this);
|
||||
});
|
||||
} else {
|
||||
sleep("Queue is empty, waiting for tasks!", 1);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void destroy() {
|
||||
if (future != null) {
|
||||
future.cancel(true);
|
||||
}
|
||||
if (executor != null) {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
void push(TbMsg msg) {
|
||||
listOperations.leftPush(redisKey, TbMsg.toByteArray(msg));
|
||||
if (trimQueue) {
|
||||
listOperations.trim(redisKey, 0, validateMaxQueueSize());
|
||||
}
|
||||
}
|
||||
|
||||
void pushOnFailure(TbMsg msg) {
|
||||
listOperations.rightPush(redisKey, TbMsg.toByteArray(msg));
|
||||
failuresCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
void resetCounter() {
|
||||
failuresCounter.set(0);
|
||||
}
|
||||
|
||||
private String constructRedisKey() {
|
||||
return ctx.getServerAddress() + ctx.getSelfId();
|
||||
}
|
||||
|
||||
private int validateMaxQueueSize() {
|
||||
if (maxQueueSize != 0) {
|
||||
return maxQueueSize;
|
||||
}
|
||||
return MAX_QUEUE_SIZE;
|
||||
}
|
||||
|
||||
private void sleep(String logMessage, int sleepSeconds) {
|
||||
try {
|
||||
log.debug(logMessage);
|
||||
TimeUnit.SECONDS.sleep(sleepSeconds);
|
||||
} catch (InterruptedException e) {
|
||||
throw new IllegalStateException("Thread interrupted!", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -15,28 +15,17 @@
|
||||
*/
|
||||
package org.thingsboard.rule.engine.rest;
|
||||
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.handler.ssl.SslContextBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
import org.springframework.web.client.AsyncRestTemplate;
|
||||
import org.springframework.web.client.HttpClientErrorException;
|
||||
import org.thingsboard.rule.engine.api.RuleNode;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||
import org.thingsboard.rule.engine.api.*;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
|
||||
import javax.net.ssl.SSLException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@RuleNode(
|
||||
@ -56,107 +45,40 @@ import java.util.concurrent.TimeUnit;
|
||||
)
|
||||
public class TbRestApiCallNode implements TbNode {
|
||||
|
||||
private static final String STATUS = "status";
|
||||
private static final String STATUS_CODE = "statusCode";
|
||||
private static final String STATUS_REASON = "statusReason";
|
||||
private static final String ERROR = "error";
|
||||
private static final String ERROR_BODY = "error_body";
|
||||
|
||||
private TbRestApiCallNodeConfiguration config;
|
||||
|
||||
private EventLoopGroup eventLoopGroup;
|
||||
private AsyncRestTemplate httpClient;
|
||||
private boolean useRedisQueueForMsgPersistence;
|
||||
private TbHttpClient httpClient;
|
||||
private TbRedisQueueProcessor queueProcessor;
|
||||
|
||||
@Override
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
try {
|
||||
this.config = TbNodeUtils.convert(configuration, TbRestApiCallNodeConfiguration.class);
|
||||
if (this.config.isUseSimpleClientHttpFactory()) {
|
||||
httpClient = new AsyncRestTemplate();
|
||||
} else {
|
||||
this.eventLoopGroup = new NioEventLoopGroup();
|
||||
Netty4ClientHttpRequestFactory nettyFactory = new Netty4ClientHttpRequestFactory(this.eventLoopGroup);
|
||||
nettyFactory.setSslContext(SslContextBuilder.forClient().build());
|
||||
httpClient = new AsyncRestTemplate(nettyFactory);
|
||||
TbRestApiCallNodeConfiguration config = TbNodeUtils.convert(configuration, TbRestApiCallNodeConfiguration.class);
|
||||
httpClient = new TbHttpClient(config);
|
||||
useRedisQueueForMsgPersistence = config.isUseRedisQueueForMsgPersistence();
|
||||
if (useRedisQueueForMsgPersistence) {
|
||||
if (ctx.getRedisTemplate() == null) {
|
||||
throw new RuntimeException("Redis cache type must be used!");
|
||||
}
|
||||
} catch (SSLException e) {
|
||||
throw new TbNodeException(e);
|
||||
queueProcessor = new TbRedisQueueProcessor(ctx, httpClient, config.isTrimQueue(), config.getMaxQueueSize());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
|
||||
String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg.getMetaData());
|
||||
HttpHeaders headers = prepareHeaders(msg.getMetaData());
|
||||
HttpMethod method = HttpMethod.valueOf(config.getRequestMethod());
|
||||
HttpEntity<String> entity = new HttpEntity<>(msg.getData(), headers);
|
||||
|
||||
ListenableFuture<ResponseEntity<String>> future = httpClient.exchange(
|
||||
endpointUrl, method, entity, String.class);
|
||||
|
||||
future.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
TbMsg next = processException(ctx, msg, throwable);
|
||||
ctx.tellFailure(next, throwable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(ResponseEntity<String> responseEntity) {
|
||||
if (responseEntity.getStatusCode().is2xxSuccessful()) {
|
||||
TbMsg next = processResponse(ctx, msg, responseEntity);
|
||||
ctx.tellNext(next, TbRelationTypes.SUCCESS);
|
||||
} else {
|
||||
TbMsg next = processFailureResponse(ctx, msg, responseEntity);
|
||||
ctx.tellNext(next, TbRelationTypes.FAILURE);
|
||||
}
|
||||
}
|
||||
});
|
||||
if (useRedisQueueForMsgPersistence) {
|
||||
queueProcessor.push(msg);
|
||||
} else {
|
||||
httpClient.processMessage(ctx, msg, null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
if (this.eventLoopGroup != null) {
|
||||
this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
|
||||
if (this.httpClient != null) {
|
||||
this.httpClient.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
private TbMsg processResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
|
||||
TbMsgMetaData metaData = origMsg.getMetaData();
|
||||
metaData.putValue(STATUS, response.getStatusCode().name());
|
||||
metaData.putValue(STATUS_CODE, response.getStatusCode().value()+"");
|
||||
metaData.putValue(STATUS_REASON, response.getStatusCode().getReasonPhrase());
|
||||
response.getHeaders().toSingleValueMap().forEach(metaData::putValue);
|
||||
return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, response.getBody());
|
||||
}
|
||||
|
||||
private TbMsg processFailureResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
|
||||
TbMsgMetaData metaData = origMsg.getMetaData();
|
||||
metaData.putValue(STATUS, response.getStatusCode().name());
|
||||
metaData.putValue(STATUS_CODE, response.getStatusCode().value()+"");
|
||||
metaData.putValue(STATUS_REASON, response.getStatusCode().getReasonPhrase());
|
||||
metaData.putValue(ERROR_BODY, response.getBody());
|
||||
return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
|
||||
}
|
||||
|
||||
private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) {
|
||||
TbMsgMetaData metaData = origMsg.getMetaData();
|
||||
metaData.putValue(ERROR, e.getClass() + ": " + e.getMessage());
|
||||
if (e instanceof HttpClientErrorException) {
|
||||
HttpClientErrorException httpClientErrorException = (HttpClientErrorException)e;
|
||||
metaData.putValue(STATUS, httpClientErrorException.getStatusText());
|
||||
metaData.putValue(STATUS_CODE, httpClientErrorException.getRawStatusCode()+"");
|
||||
metaData.putValue(ERROR_BODY, httpClientErrorException.getResponseBodyAsString());
|
||||
if (this.queueProcessor != null) {
|
||||
this.queueProcessor.destroy();
|
||||
}
|
||||
return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
|
||||
}
|
||||
|
||||
private HttpHeaders prepareHeaders(TbMsgMetaData metaData) {
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
config.getHeaders().forEach((k,v) -> {
|
||||
headers.add(TbNodeUtils.processPattern(k, metaData), TbNodeUtils.processPattern(v, metaData));
|
||||
});
|
||||
return headers;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -28,6 +28,9 @@ public class TbRestApiCallNodeConfiguration implements NodeConfiguration<TbRestA
|
||||
private String requestMethod;
|
||||
private Map<String, String> headers;
|
||||
private boolean useSimpleClientHttpFactory;
|
||||
private boolean useRedisQueueForMsgPersistence;
|
||||
private boolean trimQueue;
|
||||
private int maxQueueSize;
|
||||
|
||||
@Override
|
||||
public TbRestApiCallNodeConfiguration defaultConfiguration() {
|
||||
@ -36,6 +39,8 @@ public class TbRestApiCallNodeConfiguration implements NodeConfiguration<TbRestA
|
||||
configuration.setRequestMethod("POST");
|
||||
configuration.setHeaders(Collections.emptyMap());
|
||||
configuration.setUseSimpleClientHttpFactory(false);
|
||||
configuration.setUseRedisQueueForMsgPersistence(false);
|
||||
configuration.setTrimQueue(false);
|
||||
return configuration;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user