Rest API Call rule node. Rule Engine fixes.
This commit is contained in:
parent
280a752fc9
commit
e8496ffe16
@ -61,6 +61,7 @@ import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
|
||||
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
|
||||
import org.thingsboard.server.service.component.ComponentDiscoveryService;
|
||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
|
||||
import org.thingsboard.server.service.mail.MailExecutorService;
|
||||
import org.thingsboard.server.service.rpc.DeviceRpcService;
|
||||
import org.thingsboard.server.service.script.JsExecutorService;
|
||||
@ -184,6 +185,10 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private DbCallbackExecutorService dbCallbackExecutor;
|
||||
|
||||
@Autowired
|
||||
@Getter
|
||||
private ExternalCallExecutorService externalCallExecutorService;
|
||||
|
||||
@Autowired
|
||||
@Getter
|
||||
private MailService mailService;
|
||||
|
||||
@ -122,6 +122,11 @@ class DefaultTbContext implements TbContext {
|
||||
return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
|
||||
return new TbMsg(origMsg.getId(), type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RuleNodeId getSelfId() {
|
||||
return nodeCtx.getSelf().getId();
|
||||
@ -152,6 +157,11 @@ class DefaultTbContext implements TbContext {
|
||||
return mainCtx.getDbCallbackExecutor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListeningExecutor getExternalCallExecutor() {
|
||||
return mainCtx.getExternalCallExecutorService();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScriptEngine createJsScriptEngine(String script, String functionName, String... argNames) {
|
||||
return new NashornJsEngine(script, functionName, argNames);
|
||||
|
||||
@ -16,7 +16,10 @@
|
||||
package org.thingsboard.server.actors.tenant;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.OneForOneStrategy;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.SupervisorStrategy;
|
||||
import akka.japi.Function;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.actors.device.DeviceActor;
|
||||
import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
|
||||
@ -36,6 +39,7 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
|
||||
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
|
||||
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
|
||||
import scala.concurrent.duration.Duration;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@ -51,6 +55,12 @@ public class TenantActor extends RuleChainManagerActor {
|
||||
this.deviceActors = new HashMap<>();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SupervisorStrategy supervisorStrategy() {
|
||||
return strategy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
logger.info("[{}] Starting tenant actor.", tenantId);
|
||||
@ -147,4 +157,12 @@ public class TenantActor extends RuleChainManagerActor {
|
||||
}
|
||||
}
|
||||
|
||||
private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), new Function<Throwable, SupervisorStrategy.Directive>() {
|
||||
@Override
|
||||
public SupervisorStrategy.Directive apply(Throwable t) {
|
||||
logger.error(t, "Unknown failure");
|
||||
return SupervisorStrategy.resume();
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,33 @@
|
||||
/**
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.executors;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class ExternalCallExecutorService extends AbstractListeningExecutor {
|
||||
|
||||
@Value("${actors.rule.external_call_thread_pool_size}")
|
||||
private int externalCallExecutorThreadPoolSize;
|
||||
|
||||
@Override
|
||||
protected int getThreadPollSize() {
|
||||
return externalCallExecutorThreadPoolSize;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -214,6 +214,8 @@ actors:
|
||||
js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:10}"
|
||||
# Specify thread pool size for mail sender executor service
|
||||
mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:10}"
|
||||
# Specify thread pool size for external call service
|
||||
external_call_thread_pool_size: "${ACTORS_RULE_EXTERNAL_CALL_THREAD_POOL_SIZE:10}"
|
||||
chain:
|
||||
# Errors for particular actor are persisted once per specified amount of milliseconds
|
||||
error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"
|
||||
|
||||
@ -94,7 +94,10 @@ public final class TbMsg implements Serializable {
|
||||
TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
|
||||
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
||||
RuleChainId ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB()));
|
||||
RuleNodeId ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleChainIdLSB()));
|
||||
RuleNodeId ruleNodeId = null;
|
||||
if(proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) {
|
||||
ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
|
||||
}
|
||||
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
|
||||
return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getClusterPartition());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
|
||||
@ -62,6 +62,8 @@ public interface TbContext {
|
||||
|
||||
TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data);
|
||||
|
||||
TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data);
|
||||
|
||||
RuleNodeId getSelfId();
|
||||
|
||||
TenantId getTenantId();
|
||||
@ -96,6 +98,8 @@ public interface TbContext {
|
||||
|
||||
ListeningExecutor getDbCallbackExecutor();
|
||||
|
||||
ListeningExecutor getExternalCallExecutor();
|
||||
|
||||
MailService getMailService();
|
||||
|
||||
ScriptEngine createJsScriptEngine(String script, String functionName, String... argNames);
|
||||
|
||||
@ -81,6 +81,11 @@
|
||||
<artifactId>velocity-tools</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-web</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
|
||||
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package org.thingsboard.rule.engine.action;
|
||||
|
||||
import com.datastax.driver.core.utils.UUIDs;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
@ -32,8 +31,6 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
|
||||
|
||||
@Slf4j
|
||||
@ -187,7 +184,7 @@ public class TbAlarmNode implements TbNode {
|
||||
} else if (alarmResult.isCleared) {
|
||||
metaData.putValue(IS_CLEARED_ALARM, Boolean.TRUE.toString());
|
||||
}
|
||||
return ctx.newMsg("ALARM", originalMsg.getOriginator(), metaData, data);
|
||||
return ctx.transformMsg(originalMsg, "ALARM", originalMsg.getOriginator(), metaData, data);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package org.thingsboard.rule.engine.debug;
|
||||
|
||||
import com.datastax.driver.core.utils.UUIDs;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package org.thingsboard.rule.engine.mail;
|
||||
|
||||
import com.datastax.driver.core.utils.UUIDs;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -86,7 +85,7 @@ public class TbMsgToEmailNode implements TbNode {
|
||||
|
||||
private TbMsg buildEmailMsg(TbContext ctx, TbMsg msg, EmailPojo email) throws JsonProcessingException {
|
||||
String emailJson = MAPPER.writeValueAsString(email);
|
||||
return ctx.newMsg(SEND_EMAIL_TYPE, msg.getOriginator(), msg.getMetaData().copy(), emailJson);
|
||||
return ctx.transformMsg(msg, SEND_EMAIL_TYPE, msg.getOriginator(), msg.getMetaData().copy(), emailJson);
|
||||
}
|
||||
|
||||
private EmailPojo convert(TbMsg msg) throws IOException {
|
||||
|
||||
@ -0,0 +1,165 @@
|
||||
/**
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.rest;
|
||||
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.handler.ssl.SslContextBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
import org.springframework.web.client.*;
|
||||
import org.thingsboard.rule.engine.TbNodeUtils;
|
||||
import org.thingsboard.rule.engine.api.*;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
|
||||
import javax.net.ssl.SSLException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@RuleNode(
|
||||
type = ComponentType.ACTION,
|
||||
name = "rest api call",
|
||||
configClazz = TbRestApiCallNodeConfiguration.class,
|
||||
nodeDescription = "Invoke REST API calls to external REST server",
|
||||
nodeDetails = "Expects messages with any message type. Will invoke REST API call to external REST server.",
|
||||
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
||||
configDirective = "tbActionNodeRestApiCallConfig"
|
||||
)
|
||||
public class TbRestApiCallNode implements TbNode {
|
||||
|
||||
private static final String VARIABLE_TEMPLATE = "${%s}";
|
||||
private static final String STATUS = "status";
|
||||
private static final String STATUS_CODE = "statusCode";
|
||||
private static final String STATUS_REASON = "statusReason";
|
||||
private static final String ERROR = "error";
|
||||
private static final String ERROR_BODY = "error_body";
|
||||
|
||||
private TbRestApiCallNodeConfiguration config;
|
||||
|
||||
private EventLoopGroup eventLoopGroup;
|
||||
private AsyncRestTemplate httpClient;
|
||||
|
||||
@Override
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
try {
|
||||
this.config = TbNodeUtils.convert(configuration, TbRestApiCallNodeConfiguration.class);
|
||||
this.eventLoopGroup = new NioEventLoopGroup();
|
||||
Netty4ClientHttpRequestFactory nettyFactory = new Netty4ClientHttpRequestFactory(this.eventLoopGroup);
|
||||
nettyFactory.setSslContext(SslContextBuilder.forClient().build());
|
||||
httpClient = new AsyncRestTemplate(nettyFactory);
|
||||
} catch (SSLException e) {
|
||||
throw new TbNodeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
|
||||
String endpointUrl = processPattern(config.getRestEndpointUrlPattern(), msg.getMetaData());
|
||||
HttpHeaders headers = prepareHeaders(msg.getMetaData());
|
||||
HttpMethod method = HttpMethod.valueOf(config.getRequestMethod());
|
||||
HttpEntity<String> entity = new HttpEntity<>(msg.getData(), headers);
|
||||
|
||||
ListenableFuture<ResponseEntity<String>> future =httpClient.exchange(
|
||||
endpointUrl, method, entity, String.class);
|
||||
|
||||
future.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
TbMsg next = processException(ctx, msg, throwable);
|
||||
ctx.tellNext(next, TbRelationTypes.FAILURE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(ResponseEntity<String> responseEntity) {
|
||||
if (responseEntity.getStatusCode().is2xxSuccessful()) {
|
||||
TbMsg next = processResponse(ctx, msg, responseEntity);
|
||||
ctx.tellNext(next, TbRelationTypes.SUCCESS);
|
||||
} else {
|
||||
TbMsg next = processFailureResponse(ctx, msg, responseEntity);
|
||||
ctx.tellNext(next, TbRelationTypes.FAILURE);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
if (this.eventLoopGroup != null) {
|
||||
this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private TbMsg processResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
|
||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||
metaData.putValue(STATUS, response.getStatusCode().name());
|
||||
metaData.putValue(STATUS_CODE, response.getStatusCode().value()+"");
|
||||
metaData.putValue(STATUS_REASON, response.getStatusCode().getReasonPhrase());
|
||||
response.getHeaders().toSingleValueMap().forEach((k,v) -> metaData.putValue(k,v) );
|
||||
return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, response.getBody());
|
||||
}
|
||||
|
||||
private TbMsg processFailureResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
|
||||
TbMsgMetaData metaData = origMsg.getMetaData().copy();
|
||||
metaData.putValue(STATUS, response.getStatusCode().name());
|
||||
metaData.putValue(STATUS_CODE, response.getStatusCode().value()+"");
|
||||
metaData.putValue(STATUS_REASON, response.getStatusCode().getReasonPhrase());
|
||||
metaData.putValue(ERROR_BODY, response.getBody());
|
||||
return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
|
||||
}
|
||||
|
||||
private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) {
|
||||
TbMsgMetaData metaData = origMsg.getMetaData().copy();
|
||||
metaData.putValue(ERROR, e.getClass() + ": " + e.getMessage());
|
||||
if (e instanceof HttpClientErrorException) {
|
||||
HttpClientErrorException httpClientErrorException = (HttpClientErrorException)e;
|
||||
metaData.putValue(STATUS, httpClientErrorException.getStatusText());
|
||||
metaData.putValue(STATUS_CODE, httpClientErrorException.getRawStatusCode()+"");
|
||||
metaData.putValue(ERROR_BODY, httpClientErrorException.getResponseBodyAsString());
|
||||
}
|
||||
return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
|
||||
}
|
||||
|
||||
private HttpHeaders prepareHeaders(TbMsgMetaData metaData) {
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
config.getHeaders().forEach((k,v) -> {
|
||||
headers.add(processPattern(k, metaData), processPattern(v, metaData));
|
||||
});
|
||||
return headers;
|
||||
}
|
||||
|
||||
private String processPattern(String pattern, TbMsgMetaData metaData) {
|
||||
String result = new String(pattern);
|
||||
for (Map.Entry<String,String> keyVal : metaData.values().entrySet()) {
|
||||
result = processVar(result, keyVal.getKey(), keyVal.getValue());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private String processVar(String pattern, String key, String val) {
|
||||
String varPattern = String.format(VARIABLE_TEMPLATE, key);
|
||||
return pattern.replace(varPattern, val);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,39 @@
|
||||
/**
|
||||
* Copyright © 2016-2018 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.rest;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
@Data
|
||||
public class TbRestApiCallNodeConfiguration implements NodeConfiguration<TbRestApiCallNodeConfiguration> {
|
||||
|
||||
private String restEndpointUrlPattern;
|
||||
private String requestMethod;
|
||||
private Map<String, String> headers;
|
||||
|
||||
@Override
|
||||
public TbRestApiCallNodeConfiguration defaultConfiguration() {
|
||||
TbRestApiCallNodeConfiguration configuration = new TbRestApiCallNodeConfiguration();
|
||||
configuration.setRestEndpointUrlPattern("http://localhost/api");
|
||||
configuration.setRequestMethod("POST");
|
||||
configuration.setHeaders(Collections.emptyMap());
|
||||
return configuration;
|
||||
}
|
||||
}
|
||||
@ -79,10 +79,10 @@ public class TbSendRPCRequestNode implements TbNode {
|
||||
|
||||
ctx.getRpcService().sendRpcRequest(request, ruleEngineDeviceRpcResponse -> {
|
||||
if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
|
||||
TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().get());
|
||||
TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().get());
|
||||
ctx.tellNext(next, TbRelationTypes.SUCCESS);
|
||||
} else {
|
||||
TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
|
||||
TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
|
||||
ctx.tellNext(next, TbRelationTypes.FAILURE);
|
||||
ctx.tellError(msg, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
|
||||
}
|
||||
|
||||
@ -20,7 +20,6 @@ import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.thingsboard.rule.engine.TbNodeUtils;
|
||||
import org.thingsboard.rule.engine.api.*;
|
||||
import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader;
|
||||
@ -60,7 +59,7 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode {
|
||||
@Override
|
||||
protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
|
||||
ListenableFuture<? extends EntityId> newOriginator = getNewOriginator(ctx, msg.getOriginator());
|
||||
return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> ctx.newMsg(msg.getType(), n, msg.getMetaData(), msg.getData()));
|
||||
return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> ctx.transformMsg(msg, msg.getType(), n, msg.getMetaData(), msg.getData()));
|
||||
}
|
||||
|
||||
private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) {
|
||||
|
||||
File diff suppressed because one or more lines are too long
@ -17,13 +17,9 @@ package org.thingsboard.rule.engine.action;
|
||||
|
||||
import com.datastax.driver.core.utils.UUIDs;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.util.concurrent.AbstractListeningExecutorService;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
@ -45,8 +41,6 @@ import org.thingsboard.server.dao.alarm.AlarmService;
|
||||
import javax.script.ScriptException;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Matchers.any;
|
||||
@ -120,11 +114,12 @@ public class TbAlarmNodeTest {
|
||||
|
||||
verify(ctx).tellNext(any(), eq("Created"));
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
|
||||
ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
|
||||
ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
|
||||
ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
|
||||
verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
verify(ctx).transformMsg(msgCaptor.capture(), typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
|
||||
assertEquals("ALARM", typeCaptor.getValue());
|
||||
assertEquals(originator, originatorCaptor.getValue());
|
||||
@ -214,11 +209,12 @@ public class TbAlarmNodeTest {
|
||||
|
||||
verify(ctx).tellNext(any(), eq("Created"));
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
|
||||
ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
|
||||
ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
|
||||
ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
|
||||
verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
verify(ctx).transformMsg(msgCaptor.capture(), typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
|
||||
assertEquals("ALARM", typeCaptor.getValue());
|
||||
assertEquals(originator, originatorCaptor.getValue());
|
||||
@ -263,11 +259,12 @@ public class TbAlarmNodeTest {
|
||||
|
||||
verify(ctx).tellNext(any(), eq("Updated"));
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
|
||||
ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
|
||||
ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
|
||||
ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
|
||||
verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
verify(ctx).transformMsg(msgCaptor.capture(), typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
|
||||
assertEquals("ALARM", typeCaptor.getValue());
|
||||
assertEquals(originator, originatorCaptor.getValue());
|
||||
@ -313,11 +310,12 @@ public class TbAlarmNodeTest {
|
||||
|
||||
verify(ctx).tellNext(any(), eq("Cleared"));
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
|
||||
ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
|
||||
ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
|
||||
ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
|
||||
verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
verify(ctx).transformMsg(msgCaptor.capture(), typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
|
||||
assertEquals("ALARM", typeCaptor.getValue());
|
||||
assertEquals(originator, originatorCaptor.getValue());
|
||||
|
||||
@ -38,7 +38,6 @@ import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class TbMsgToEmailNodeTest {
|
||||
@ -64,11 +63,12 @@ public class TbMsgToEmailNodeTest {
|
||||
|
||||
emailNode.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
|
||||
ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
|
||||
ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
|
||||
ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
|
||||
verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
verify(ctx).transformMsg(msgCaptor.capture(), typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
|
||||
|
||||
assertEquals("SEND_EMAIL", typeCaptor.getValue());
|
||||
|
||||
@ -70,11 +70,12 @@ public class TbChangeOriginatorNodeTest {
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
|
||||
ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
|
||||
ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
|
||||
ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
|
||||
verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
verify(ctx).transformMsg(msgCaptor.capture(), typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
|
||||
assertEquals(customerId, originatorCaptor.getValue());
|
||||
}
|
||||
@ -96,11 +97,12 @@ public class TbChangeOriginatorNodeTest {
|
||||
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
|
||||
ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
|
||||
ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
|
||||
ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
|
||||
verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
verify(ctx).transformMsg(msgCaptor.capture(), typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
|
||||
|
||||
assertEquals(customerId, originatorCaptor.getValue());
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user