Created AWS Lambda node (#10646)
* created aws lambda node * updated logic for processing input keys * removed logic for parsing mappings * added possibility to pass qualifier using patterns * added check for function name and invocation type * tests refactoring after review * removed ability to pass invocation type to the config * added node teplatization doc * changed title for documenatation * added uiResources and configDirective * changed names of classes * removed false param from TbNodeException in init method --------- Co-authored-by: ShvaykaD <dshvaika@thingsboard.io>
This commit is contained in:
parent
08bd16e537
commit
7480e5f099
5
pom.xml
5
pom.xml
@ -1999,6 +1999,11 @@
|
|||||||
</exclusion>
|
</exclusion>
|
||||||
</exclusions>
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.amazonaws</groupId>
|
||||||
|
<artifactId>aws-java-sdk-lambda</artifactId>
|
||||||
|
<version>${aws.sdk.version}</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.google.cloud</groupId>
|
<groupId>com.google.cloud</groupId>
|
||||||
<artifactId>google-cloud-pubsub</artifactId>
|
<artifactId>google-cloud-pubsub</artifactId>
|
||||||
|
|||||||
@ -100,6 +100,10 @@
|
|||||||
<groupId>com.amazonaws</groupId>
|
<groupId>com.amazonaws</groupId>
|
||||||
<artifactId>aws-java-sdk-sns</artifactId>
|
<artifactId>aws-java-sdk-sns</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.amazonaws</groupId>
|
||||||
|
<artifactId>aws-java-sdk-lambda</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.google.cloud</groupId>
|
<groupId>com.google.cloud</groupId>
|
||||||
<artifactId>google-cloud-pubsub</artifactId>
|
<artifactId>google-cloud-pubsub</artifactId>
|
||||||
|
|||||||
@ -0,0 +1,151 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2024 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.rule.engine.aws.lambda;
|
||||||
|
|
||||||
|
import com.amazonaws.ClientConfiguration;
|
||||||
|
import com.amazonaws.auth.AWSCredentials;
|
||||||
|
import com.amazonaws.auth.AWSStaticCredentialsProvider;
|
||||||
|
import com.amazonaws.auth.BasicAWSCredentials;
|
||||||
|
import com.amazonaws.handlers.AsyncHandler;
|
||||||
|
import com.amazonaws.services.lambda.AWSLambdaAsync;
|
||||||
|
import com.amazonaws.services.lambda.AWSLambdaAsyncClientBuilder;
|
||||||
|
import com.amazonaws.services.lambda.model.InvokeRequest;
|
||||||
|
import com.amazonaws.services.lambda.model.InvokeResult;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.thingsboard.rule.engine.api.RuleNode;
|
||||||
|
import org.thingsboard.rule.engine.api.TbContext;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||||
|
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||||
|
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
|
||||||
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
|
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||||
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@RuleNode(
|
||||||
|
type = ComponentType.EXTERNAL,
|
||||||
|
name = "aws lambda",
|
||||||
|
configClazz = TbAwsLambdaNodeConfiguration.class,
|
||||||
|
nodeDescription = "Publish message to the AWS Lambda",
|
||||||
|
nodeDetails = "Publishes messages to AWS Lambda, a service that lets you run code " +
|
||||||
|
"without provisioning or managing servers. " +
|
||||||
|
"It sends messages using a RequestResponse invocation type. " +
|
||||||
|
"The node uses a pre-configured client and specified function to run.<br><br>" +
|
||||||
|
"Output connections: <code>Success</code>, <code>Failure</code>.",
|
||||||
|
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
||||||
|
configDirective = "tbExternalNodeLambdaConfig",
|
||||||
|
iconUrl = "data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAyNCAyNCIgd2lkdGg9IjQ4IiBoZWlnaHQ9IjQ4Ij48cGF0aCBkPSJNMTMuMjMgMTAuNTZWMTBjLTEuOTQgMC0zLjk5LjM5LTMuOTkgMi42NyAwIDEuMTYuNjEgMS45NSAxLjYzIDEuOTUuNzYgMCAxLjQzLS40NyAxLjg2LTEuMjIuNTItLjkzLjUtMS44LjUtMi44NG0yLjcgNi41M2MtLjE4LjE2LS40My4xNy0uNjMuMDYtLjg5LS43NC0xLjA1LTEuMDgtMS41NC0xLjc5LTEuNDcgMS41LTIuNTEgMS45NS00LjQyIDEuOTUtMi4yNSAwLTQuMDEtMS4zOS00LjAxLTQuMTcgMC0yLjE4IDEuMTctMy42NCAyLjg2LTQuMzggMS40Ni0uNjQgMy40OS0uNzYgNS4wNC0uOTNWNy41YzAtLjY2LjA1LTEuNDEtLjMzLTEuOTYtLjMyLS40OS0uOTUtLjctMS41LS43LTEuMDIgMC0xLjkzLjUzLTIuMTUgMS42MS0uMDUuMjQtLjI1LjQ4LS40Ny40OWwtMi42LS4yOGMtLjIyLS4wNS0uNDYtLjIyLS40LS41Ni42LTMuMTUgMy40NS00LjEgNi00LjEgMS4zIDAgMyAuMzUgNC4wMyAxLjMzQzE3LjExIDQuNTUgMTcgNi4xOCAxNyA3Ljk1djQuMTdjMCAxLjI1LjUgMS44MSAxIDIuNDguMTcuMjUuMjEuNTQgMCAuNzFsLTIuMDYgMS43OGgtLjAxIj48L3BhdGg+PHBhdGggZD0iTTIwLjE2IDE5LjU0QzE4IDIxLjE0IDE0LjgyIDIyIDEyLjEgMjJjLTMuODEgMC03LjI1LTEuNDEtOS44NS0zLjc2LS4yLS4xOC0uMDItLjQzLjI1LS4yOSAyLjc4IDEuNjMgNi4yNSAyLjYxIDkuODMgMi42MSAyLjQxIDAgNS4wNy0uNSA3LjUxLTEuNTMuMzctLjE2LjY2LjI0LjMyLjUxIj48L3BhdGg+PHBhdGggZD0iTTIxLjA3IDE4LjVjLS4yOC0uMzYtMS44NS0uMTctMi41Ny0uMDgtLjE5LjAyLS4yMi0uMTYtLjAzLS4zIDEuMjQtLjg4IDMuMjktLjYyIDMuNTMtLjMzLjI0LjMtLjA3IDIuMzUtMS4yNCAzLjMyLS4xOC4xNi0uMzUuMDctLjI2LS4xMS4yNi0uNjcuODUtMi4xNC41Ny0yLjV6Ij48L3BhdGg+PC9zdmc+"
|
||||||
|
)
|
||||||
|
public class TbAwsLambdaNode extends TbAbstractExternalNode {
|
||||||
|
|
||||||
|
private TbAwsLambdaNodeConfiguration config;
|
||||||
|
private AWSLambdaAsync client;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||||
|
config = TbNodeUtils.convert(configuration, TbAwsLambdaNodeConfiguration.class);
|
||||||
|
if (StringUtils.isBlank(config.getFunctionName())) {
|
||||||
|
throw new TbNodeException("Function name must be set!", true);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
AWSCredentials awsCredentials = new BasicAWSCredentials(config.getAccessKey(), config.getSecretKey());
|
||||||
|
client = AWSLambdaAsyncClientBuilder.standard()
|
||||||
|
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
|
||||||
|
.withRegion(config.getRegion())
|
||||||
|
.withClientConfiguration(new ClientConfiguration()
|
||||||
|
.withConnectionTimeout((int) TimeUnit.SECONDS.toMillis(config.getConnectionTimeout()))
|
||||||
|
.withRequestTimeout((int) TimeUnit.SECONDS.toMillis(config.getRequestTimeout())))
|
||||||
|
.build();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new TbNodeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||||
|
var tbMsg = ackIfNeeded(ctx, msg);
|
||||||
|
String functionName = TbNodeUtils.processPattern(config.getFunctionName(), tbMsg);
|
||||||
|
String qualifier = StringUtils.isBlank(config.getQualifier()) ?
|
||||||
|
TbAwsLambdaNodeConfiguration.DEFAULT_QUALIFIER :
|
||||||
|
TbNodeUtils.processPattern(config.getQualifier(), tbMsg);
|
||||||
|
InvokeRequest request = toRequest(tbMsg.getData(), functionName, qualifier);
|
||||||
|
client.invokeAsync(request, new AsyncHandler<>() {
|
||||||
|
@Override
|
||||||
|
public void onError(Exception e) {
|
||||||
|
tellFailure(ctx, tbMsg, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(InvokeRequest request, InvokeResult invokeResult) {
|
||||||
|
try {
|
||||||
|
if (config.isTellFailureIfFuncThrowsExc() && invokeResult.getFunctionError() != null) {
|
||||||
|
throw new RuntimeException(getPayload(invokeResult));
|
||||||
|
}
|
||||||
|
tellSuccess(ctx, getResponseMsg(tbMsg, invokeResult));
|
||||||
|
} catch (Exception e) {
|
||||||
|
tellFailure(ctx, processException(tbMsg, invokeResult, e), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private InvokeRequest toRequest(String requestBody, String functionName, String qualifier) {
|
||||||
|
return new InvokeRequest()
|
||||||
|
.withFunctionName(functionName)
|
||||||
|
.withPayload(requestBody)
|
||||||
|
.withQualifier(qualifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getPayload(InvokeResult invokeResult) {
|
||||||
|
ByteBuffer buf = invokeResult.getPayload();
|
||||||
|
if (buf == null) {
|
||||||
|
throw new RuntimeException("Payload from result of AWS Lambda function execution is null.");
|
||||||
|
}
|
||||||
|
byte[] responseBytes = new byte[buf.remaining()];
|
||||||
|
buf.get(responseBytes);
|
||||||
|
return new String(responseBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TbMsg getResponseMsg(TbMsg originalMsg, InvokeResult invokeResult) {
|
||||||
|
TbMsgMetaData metaData = originalMsg.getMetaData().copy();
|
||||||
|
metaData.putValue("requestId", invokeResult.getSdkResponseMetadata().getRequestId());
|
||||||
|
String data = getPayload(invokeResult);
|
||||||
|
return TbMsg.transformMsg(originalMsg, metaData, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TbMsg processException(TbMsg origMsg, InvokeResult invokeResult, Throwable t) {
|
||||||
|
TbMsgMetaData metaData = origMsg.getMetaData().copy();
|
||||||
|
metaData.putValue("error", t.getClass() + ": " + t.getMessage());
|
||||||
|
metaData.putValue("requestId", invokeResult.getSdkResponseMetadata().getRequestId());
|
||||||
|
return TbMsg.transformMsgMetadata(origMsg, metaData);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
if (client != null) {
|
||||||
|
try {
|
||||||
|
client.shutdown();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to shutdown Lambda client during destroy", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,46 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2024 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.rule.engine.aws.lambda;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class TbAwsLambdaNodeConfiguration implements NodeConfiguration<TbAwsLambdaNodeConfiguration> {
|
||||||
|
|
||||||
|
public static final String DEFAULT_QUALIFIER = "$LATEST";
|
||||||
|
|
||||||
|
private String accessKey;
|
||||||
|
private String secretKey;
|
||||||
|
private String region;
|
||||||
|
private String functionName;
|
||||||
|
private String qualifier;
|
||||||
|
private int connectionTimeout;
|
||||||
|
private int requestTimeout;
|
||||||
|
private boolean tellFailureIfFuncThrowsExc;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TbAwsLambdaNodeConfiguration defaultConfiguration() {
|
||||||
|
TbAwsLambdaNodeConfiguration configuration = new TbAwsLambdaNodeConfiguration();
|
||||||
|
configuration.setRegion("us-east-1");
|
||||||
|
configuration.setQualifier(DEFAULT_QUALIFIER);
|
||||||
|
configuration.setConnectionTimeout(10);
|
||||||
|
configuration.setRequestTimeout(5);
|
||||||
|
configuration.setTellFailureIfFuncThrowsExc(false);
|
||||||
|
return configuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -0,0 +1,308 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2024 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.rule.engine.aws.lambda;
|
||||||
|
|
||||||
|
import com.amazonaws.ResponseMetadata;
|
||||||
|
import com.amazonaws.handlers.AsyncHandler;
|
||||||
|
import com.amazonaws.services.lambda.AWSLambdaAsync;
|
||||||
|
import com.amazonaws.services.lambda.model.AWSLambdaException;
|
||||||
|
import com.amazonaws.services.lambda.model.InvokeRequest;
|
||||||
|
import com.amazonaws.services.lambda.model.InvokeResult;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
|
import org.junit.jupiter.params.provider.NullAndEmptySource;
|
||||||
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.springframework.test.util.ReflectionTestUtils;
|
||||||
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
|
import org.thingsboard.rule.engine.api.TbContext;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||||
|
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||||
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
|
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||||
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
||||||
|
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
import static org.thingsboard.rule.engine.aws.lambda.TbAwsLambdaNodeConfiguration.DEFAULT_QUALIFIER;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
public class TbAwsLambdaNodeTest {
|
||||||
|
|
||||||
|
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ddb88645-7379-4a08-a51c-e84a0b4b3d88"));
|
||||||
|
|
||||||
|
private TbAwsLambdaNode node;
|
||||||
|
private TbAwsLambdaNodeConfiguration config;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private TbContext ctx;
|
||||||
|
@Mock
|
||||||
|
private AWSLambdaAsync clientMock;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() {
|
||||||
|
node = new TbAwsLambdaNode();
|
||||||
|
config = new TbAwsLambdaNodeConfiguration().defaultConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void verifyDefaultConfig() {
|
||||||
|
assertThat(config.getAccessKey()).isNull();
|
||||||
|
assertThat(config.getSecretKey()).isNull();
|
||||||
|
assertThat(config.getRegion()).isEqualTo(("us-east-1"));
|
||||||
|
assertThat(config.getFunctionName()).isNull();
|
||||||
|
assertThat(config.getQualifier()).isEqualTo(DEFAULT_QUALIFIER);
|
||||||
|
assertThat(config.getConnectionTimeout()).isEqualTo(10);
|
||||||
|
assertThat(config.getRequestTimeout()).isEqualTo(5);
|
||||||
|
assertThat(config.isTellFailureIfFuncThrowsExc()).isFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@NullAndEmptySource
|
||||||
|
@ValueSource(strings = " ")
|
||||||
|
public void givenInvalidFunctionName_whenInit_thenThrowsException(String funcName) {
|
||||||
|
config.setFunctionName(funcName);
|
||||||
|
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
||||||
|
assertThatThrownBy(() -> node.init(ctx, configuration))
|
||||||
|
.isInstanceOf(TbNodeException.class)
|
||||||
|
.hasMessage("Function name must be set!");
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@MethodSource
|
||||||
|
public void givenRequest_whenOnMsg_thenTellSuccess(String data, TbMsgMetaData metadata, String functionName, String qualifier, String expectedQualifier) {
|
||||||
|
init();
|
||||||
|
config.setFunctionName(functionName);
|
||||||
|
config.setQualifier(qualifier);
|
||||||
|
|
||||||
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metadata, data);
|
||||||
|
|
||||||
|
InvokeRequest request = createInvokeRequest(msg);
|
||||||
|
String requestIdStr = "a124af57-e7c3-4ebb-83bf-b09ff86eaa23";
|
||||||
|
String funcResponsePayload = "{\"statusCode\":200}";
|
||||||
|
|
||||||
|
when(clientMock.invokeAsync(any(), any())).then(invocation -> {
|
||||||
|
InvokeResult result = new InvokeResult();
|
||||||
|
result.setSdkResponseMetadata(new ResponseMetadata(Map.of(ResponseMetadata.AWS_REQUEST_ID, requestIdStr)));
|
||||||
|
result.setPayload(ByteBuffer.wrap(funcResponsePayload.getBytes()));
|
||||||
|
AsyncHandler<InvokeRequest, InvokeResult> asyncHandler = invocation.getArgument(1);
|
||||||
|
asyncHandler.onSuccess(request, result);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
node.onMsg(ctx, msg);
|
||||||
|
|
||||||
|
ArgumentCaptor<InvokeRequest> invokeRequestCaptor = ArgumentCaptor.forClass(InvokeRequest.class);
|
||||||
|
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||||
|
|
||||||
|
verify(clientMock).invokeAsync(invokeRequestCaptor.capture(), any());
|
||||||
|
verify(ctx).tellSuccess(msgCaptor.capture());
|
||||||
|
|
||||||
|
assertThat(invokeRequestCaptor.getValue().getQualifier()).isEqualTo(expectedQualifier);
|
||||||
|
TbMsgMetaData resultMsgMetadata = metadata.copy();
|
||||||
|
resultMsgMetadata.putValue("requestId", requestIdStr);
|
||||||
|
TbMsg resultedMsg = TbMsg.transformMsg(msg, resultMsgMetadata, funcResponsePayload);
|
||||||
|
assertThat(msgCaptor.getValue()).usingRecursiveComparison()
|
||||||
|
.ignoringFields("ctx")
|
||||||
|
.isEqualTo(resultedMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Stream<Arguments> givenRequest_whenOnMsg_thenTellSuccess() {
|
||||||
|
return Stream.of(
|
||||||
|
Arguments.of(TbMsg.EMPTY_JSON_OBJECT, TbMsgMetaData.EMPTY, "functionA", "qualifierA", "qualifierA"),
|
||||||
|
Arguments.of(TbMsg.EMPTY_JSON_OBJECT, TbMsgMetaData.EMPTY, "functionA", null, DEFAULT_QUALIFIER),
|
||||||
|
Arguments.of("{\"funcNameMsgPattern\": \"functionB\", \"qualifierMsgPattern\": \"qualifierB\"}",
|
||||||
|
TbMsgMetaData.EMPTY, "$[funcNameMsgPattern]", "$[qualifierMsgPattern]", "qualifierB"),
|
||||||
|
Arguments.of(TbMsg.EMPTY_JSON_OBJECT,
|
||||||
|
new TbMsgMetaData(
|
||||||
|
Map.of(
|
||||||
|
"funcNameMdPattern", "functionC",
|
||||||
|
"qualifierMdPattern", "qualifierC")
|
||||||
|
), "${funcNameMdPattern}", "${qualifierMdPattern}", "qualifierC")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenExceptionWasThrownInsideFunctionAndTellFailureIfFuncThrowsExcIsTrue_whenOnMsg_thenTellFailure() {
|
||||||
|
init();
|
||||||
|
config.setTellFailureIfFuncThrowsExc(true);
|
||||||
|
|
||||||
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
|
||||||
|
InvokeRequest request = createInvokeRequest(msg);
|
||||||
|
String requestIdStr = "a124af57-e7c3-4ebb-83bf-b09ff86eaa23";
|
||||||
|
String errorMsg = "Unhandled exception from function";
|
||||||
|
|
||||||
|
when(clientMock.invokeAsync(any(), any())).then(invocation -> {
|
||||||
|
InvokeResult result = new InvokeResult();
|
||||||
|
result.setPayload(ByteBuffer.wrap(errorMsg.getBytes(StandardCharsets.UTF_8)));
|
||||||
|
result.setFunctionError(errorMsg);
|
||||||
|
result.setSdkResponseMetadata(new ResponseMetadata(Map.of(ResponseMetadata.AWS_REQUEST_ID, requestIdStr)));
|
||||||
|
AsyncHandler<InvokeRequest, InvokeResult> asyncHandler = invocation.getArgument(1);
|
||||||
|
asyncHandler.onSuccess(request, result);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
node.onMsg(ctx, msg);
|
||||||
|
|
||||||
|
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
|
||||||
|
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||||
|
|
||||||
|
verify(clientMock).invokeAsync(eq(request), any());
|
||||||
|
verify(ctx).tellFailure(msgCaptor.capture(), throwableCaptor.capture());
|
||||||
|
|
||||||
|
var metadata = Map.of("error", RuntimeException.class + ": " + errorMsg, "requestId", requestIdStr);
|
||||||
|
TbMsg resultedMsg = TbMsg.transformMsgMetadata(msg, new TbMsgMetaData(metadata));
|
||||||
|
|
||||||
|
assertThat(msgCaptor.getValue()).usingRecursiveComparison()
|
||||||
|
.ignoringFields("ctx")
|
||||||
|
.isEqualTo(resultedMsg);
|
||||||
|
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenExceptionWasThrownInsideFunctionAndTellFailureIfFuncThrowsExcIsFalse_whenOnMsg_thenTellSuccess() {
|
||||||
|
init();
|
||||||
|
|
||||||
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
|
||||||
|
InvokeRequest request = createInvokeRequest(msg);
|
||||||
|
String requestIdStr = "e83dfbc4-68d5-441c-8ee9-289959a30d3b";
|
||||||
|
String payload = "{\"errorMessage\":\"Something went wrong\",\"errorType\":\"Exception\",\"requestId\":\"" + requestIdStr + "\"}";
|
||||||
|
|
||||||
|
when(clientMock.invokeAsync(any(), any())).then(invocation -> {
|
||||||
|
InvokeResult result = new InvokeResult();
|
||||||
|
result.setSdkResponseMetadata(new ResponseMetadata(Map.of("AWS_REQUEST_ID", requestIdStr)));
|
||||||
|
result.setPayload(ByteBuffer.wrap(payload.getBytes()));
|
||||||
|
AsyncHandler<InvokeRequest, InvokeResult> asyncHandler = invocation.getArgument(1);
|
||||||
|
asyncHandler.onSuccess(request, result);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
node.onMsg(ctx, msg);
|
||||||
|
|
||||||
|
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||||
|
|
||||||
|
verify(clientMock).invokeAsync(eq(request), any());
|
||||||
|
verify(ctx).tellSuccess(msgCaptor.capture());
|
||||||
|
|
||||||
|
Map<String, String> metadata = Map.of("requestId", requestIdStr);
|
||||||
|
TbMsg resultedMsg = TbMsg.transformMsg(msg, new TbMsgMetaData(metadata), payload);
|
||||||
|
|
||||||
|
assertThat(msgCaptor.getValue()).usingRecursiveComparison()
|
||||||
|
.ignoringFields("ctx")
|
||||||
|
.isEqualTo(resultedMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenPayloadFromResultIsNull_whenOnMsg_thenTellFailure() {
|
||||||
|
init();
|
||||||
|
|
||||||
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
|
||||||
|
InvokeRequest request = createInvokeRequest(msg);
|
||||||
|
String requestIdStr = "12bbb074-e2fc-4381-8f28-d4bd235103d5";
|
||||||
|
String errorMsg = "Payload from result of AWS Lambda function execution is null.";
|
||||||
|
|
||||||
|
when(clientMock.invokeAsync(any(), any())).then(invocation -> {
|
||||||
|
InvokeResult result = new InvokeResult();
|
||||||
|
result.setSdkResponseMetadata(new ResponseMetadata(Map.of(ResponseMetadata.AWS_REQUEST_ID, requestIdStr)));
|
||||||
|
AsyncHandler<InvokeRequest, InvokeResult> asyncHandler = invocation.getArgument(1);
|
||||||
|
asyncHandler.onSuccess(request, result);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
node.onMsg(ctx, msg);
|
||||||
|
|
||||||
|
verify(clientMock).invokeAsync(eq(request), any());
|
||||||
|
|
||||||
|
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
|
||||||
|
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||||
|
|
||||||
|
verify(ctx).tellFailure(msgCaptor.capture(), throwableCaptor.capture());
|
||||||
|
|
||||||
|
var metadata = Map.of("error", RuntimeException.class + ": " + errorMsg, "requestId", requestIdStr);
|
||||||
|
TbMsg resultedMsg = TbMsg.transformMsgMetadata(msg, new TbMsgMetaData(metadata));
|
||||||
|
|
||||||
|
assertThat(msgCaptor.getValue()).usingRecursiveComparison()
|
||||||
|
.ignoringFields("ctx")
|
||||||
|
.isEqualTo(resultedMsg);
|
||||||
|
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenExceptionWasThrownOnAWS_whenOnMsg_thenTellFailure() {
|
||||||
|
init();
|
||||||
|
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
|
||||||
|
InvokeRequest request = createInvokeRequest(msg);
|
||||||
|
|
||||||
|
String errorMsg = "Simulated error";
|
||||||
|
when(clientMock.invokeAsync(any(), any())).then(invocation -> {
|
||||||
|
AsyncHandler<InvokeRequest, InvokeResult> asyncHandler = invocation.getArgument(1);
|
||||||
|
asyncHandler.onError(new AWSLambdaException(errorMsg));
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
node.onMsg(ctx, msg);
|
||||||
|
|
||||||
|
verify(clientMock).invokeAsync(eq(request), any());
|
||||||
|
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
|
||||||
|
verify(ctx).tellFailure(eq(msg), throwableCaptor.capture());
|
||||||
|
assertThat(throwableCaptor.getValue()).isInstanceOf(AWSLambdaException.class).hasMessageStartingWith(errorMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void init() {
|
||||||
|
config.setAccessKey("accessKey");
|
||||||
|
config.setSecretKey("secretKey");
|
||||||
|
config.setFunctionName("new-function");
|
||||||
|
ReflectionTestUtils.setField(node, "client", clientMock);
|
||||||
|
ReflectionTestUtils.setField(node, "config", config);
|
||||||
|
}
|
||||||
|
|
||||||
|
private InvokeRequest createInvokeRequest(TbMsg msg) {
|
||||||
|
return new InvokeRequest()
|
||||||
|
.withFunctionName(getFunctionName(msg))
|
||||||
|
.withPayload(msg.getData())
|
||||||
|
.withQualifier(getQualifier(msg));
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getQualifier(TbMsg msg) {
|
||||||
|
return StringUtils.isBlank(config.getQualifier()) ?
|
||||||
|
TbAwsLambdaNodeConfiguration.DEFAULT_QUALIFIER :
|
||||||
|
TbNodeUtils.processPattern(config.getQualifier(), msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getFunctionName(TbMsg msg) {
|
||||||
|
return TbNodeUtils.processPattern(config.getFunctionName(), msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -0,0 +1,83 @@
|
|||||||
|
# Rule nodes fields templatization
|
||||||
|
|
||||||
|
Templatization is the process of using predefined templates to dynamically insert or substitute values into text.
|
||||||
|
These templates serve as placeholders for variables that can be filled in later with actual data.
|
||||||
|
|
||||||
|
In the context of rule engine, templates are used to extract data from incoming messages during runtime.
|
||||||
|
This is particularly helpful in the rule node configuration, where templatization allows for dynamic configuration by replacing static values in the configuration fields with real-time values from the incoming messages.
|
||||||
|
This enables more flexible and automated handling of data, making it easier to perform conditional operations based on varying inputs.
|
||||||
|
|
||||||
|
## Syntax
|
||||||
|
|
||||||
|
Templates start with a dollar sign (`$`), followed by brackets with a key name inside.
|
||||||
|
Square brackets (`[]`) are used for message keys, while curly brackets (`{}`) are used for message metadata keys.
|
||||||
|
For example:
|
||||||
|
- `$[messageKey]` - will extract value of `messageKey` from incoming message.
|
||||||
|
- `${metadataKey}` - will extract value of `metadataKey` from incoming message metadata.
|
||||||
|
|
||||||
|
In the example above, `messageKey` and `metadataKey` represent any key name that may exist within the message or its metadata.
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
Let's review an example. First JSON is message, second is message metadata:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"temperature": 26.5,
|
||||||
|
"humidity": 75.2,
|
||||||
|
"soilMoisture": 28.9,
|
||||||
|
"windSpeed": 26.2,
|
||||||
|
"location": "riverside"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"deviceType": "weather_sensor",
|
||||||
|
"deviceName": "weather1",
|
||||||
|
"ts": "1685379440000"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Assume, we detected an unusually high wind speed and want to send this telemetry reading to some external REST API.
|
||||||
|
Every reading needs to be associated with specific device and location - this information is available only in real-time.
|
||||||
|
We can use templates extract necessary data and to construct URL for sending data:
|
||||||
|
|
||||||
|
`example-base-url.com/report-reading?location=$[location]&deviceName=${deviceName}`
|
||||||
|
|
||||||
|
This template will be resolved to:
|
||||||
|
|
||||||
|
`example-base-url.com/report-reading?location=riverside&deviceName=weather1`
|
||||||
|
|
||||||
|
Templates are ideal for scenarios where the specific values aren't known at the time of configuration but will become available at runtime.
|
||||||
|
|
||||||
|
## Notes
|
||||||
|
|
||||||
|
- Templates can be combined with regular text. For example: "Fuel tanks are filled to `$[fuelLevel]`%".
|
||||||
|
- You can access nested keys in JSON object using dot notation: `$[object.key]`.
|
||||||
|
- If specified key is missing or value associated with that key is an object or an array, then template string will be returned unchanged.
|
||||||
|
|
||||||
|
To illustrate written above let's review an example. Here's content of a message:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"number": 123.45,
|
||||||
|
"string": "text",
|
||||||
|
"boolean": true,
|
||||||
|
"array": [1, 2, 3],
|
||||||
|
"object": {
|
||||||
|
"property": "propertyValue"
|
||||||
|
},
|
||||||
|
"null": null
|
||||||
|
}
|
||||||
|
```
|
||||||
|
Here's a table with comparison between templates and extracted values:
|
||||||
|
|
||||||
|
| **Template** | **Extracted value** |
|
||||||
|
|--------------------|---------------------|
|
||||||
|
| $[number] | 123.45 |
|
||||||
|
| $[string] | text |
|
||||||
|
| $[boolean] | true |
|
||||||
|
| $[array] | $[array] |
|
||||||
|
| $[object] | $[object] |
|
||||||
|
| $[object.property] | propertyValue |
|
||||||
|
| $[null] | null |
|
||||||
|
| $[doesNotExist] | $[doesNotExist] |
|
||||||
Loading…
x
Reference in New Issue
Block a user