Merge branch 'master' into develop/3.3

This commit is contained in:
Igor Kulikov 2021-02-09 16:52:13 +02:00
commit b358943d04
20 changed files with 214 additions and 43 deletions

View File

@ -88,6 +88,17 @@
<artifactId>javax.mail</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -16,15 +16,20 @@
package org.thingsboard.rule.engine.api.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@ -34,8 +39,11 @@ public class TbNodeUtils {
private static final ObjectMapper mapper = new ObjectMapper();
private static final String VARIABLE_TEMPLATE = "${%s}";
private static final String METADATA_VARIABLE_TEMPLATE = "${%s}";
private static final Pattern DATA_PATTERN = Pattern.compile("(\\$\\[)(.*?)(])");
private static final String DATA_VARIABLE_TEMPLATE = "$[%s]";
public static <T> T convert(TbNodeConfiguration configuration, Class<T> clazz) throws TbNodeException {
try {
@ -45,6 +53,43 @@ public class TbNodeUtils {
}
}
public static List<String> processPatterns(List<String> patterns, TbMsg tbMsg) {
if (!CollectionUtils.isEmpty(patterns)) {
return patterns.stream().map(p -> processPattern(p, tbMsg)).collect(Collectors.toList());
}
return Collections.emptyList();
}
public static String processPattern(String pattern, TbMsg tbMsg) {
try {
String result = processPattern(pattern, tbMsg.getMetaData());
JsonNode json = mapper.readTree(tbMsg.getData());
if (json.isObject()) {
Matcher matcher = DATA_PATTERN.matcher(result);
while (matcher.find()) {
String group = matcher.group(2);
String[] keys = group.split("\\.");
JsonNode jsonNode = json;
for (String key : keys) {
if (!StringUtils.isEmpty(key) && jsonNode != null) {
jsonNode = jsonNode.get(key);
} else {
jsonNode = null;
break;
}
}
if (jsonNode != null && !jsonNode.isObject() && !jsonNode.isArray()) {
result = result.replace(String.format(DATA_VARIABLE_TEMPLATE, group), jsonNode.asText());
}
}
}
return result;
} catch (Exception e) {
throw new RuntimeException("Failed to process pattern!", e);
}
}
public static List<String> processPatterns(List<String> patterns, TbMsgMetaData metaData) {
if (!CollectionUtils.isEmpty(patterns)) {
return patterns.stream().map(p -> processPattern(p, metaData)).collect(Collectors.toList());
@ -53,15 +98,15 @@ public class TbNodeUtils {
}
public static String processPattern(String pattern, TbMsgMetaData metaData) {
String result = new String(pattern);
for (Map.Entry<String,String> keyVal : metaData.values().entrySet()) {
String result = pattern;
for (Map.Entry<String, String> keyVal : metaData.values().entrySet()) {
result = processVar(result, keyVal.getKey(), keyVal.getValue());
}
return result;
}
private static String processVar(String pattern, String key, String val) {
String varPattern = String.format(VARIABLE_TEMPLATE, key);
String varPattern = String.format(METADATA_VARIABLE_TEMPLATE, key);
return pattern.replace(varPattern, val);
}

View File

@ -0,0 +1,115 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api.util;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
@RunWith(MockitoJUnitRunner.class)
public class TbNodeUtilsTest {
@Test
public void testSimpleReplacement() {
String pattern = "ABC ${metadata_key} $[data_key]";
TbMsgMetaData md = new TbMsgMetaData();
md.putValue("metadata_key", "metadata_value");
ObjectNode node = JacksonUtil.newObjectNode();
node.put("data_key", "data_value");
TbMsg msg = TbMsg.newMsg("CUSTOM", TenantId.SYS_TENANT_ID, md, JacksonUtil.toString(node));
String result = TbNodeUtils.processPattern(pattern, msg);
Assert.assertEquals("ABC metadata_value data_value", result);
}
@Test
public void testNoReplacement() {
String pattern = "ABC ${metadata_key} $[data_key]";
TbMsgMetaData md = new TbMsgMetaData();
md.putValue("key", "metadata_value");
ObjectNode node = JacksonUtil.newObjectNode();
node.put("key", "data_value");
TbMsg msg = TbMsg.newMsg("CUSTOM", TenantId.SYS_TENANT_ID, md, JacksonUtil.toString(node));
String result = TbNodeUtils.processPattern(pattern, msg);
Assert.assertEquals(pattern, result);
}
@Test
public void testSameKeysReplacement() {
String pattern = "ABC ${key} $[key]";
TbMsgMetaData md = new TbMsgMetaData();
md.putValue("key", "metadata_value");
ObjectNode node = JacksonUtil.newObjectNode();
node.put("key", "data_value");
TbMsg msg = TbMsg.newMsg("CUSTOM", TenantId.SYS_TENANT_ID, md, JacksonUtil.toString(node));
String result = TbNodeUtils.processPattern(pattern, msg);
Assert.assertEquals("ABC metadata_value data_value", result);
}
@Test
public void testComplexObjectReplacement() {
String pattern = "ABC ${key} $[key1.key2.key3]";
TbMsgMetaData md = new TbMsgMetaData();
md.putValue("key", "metadata_value");
ObjectNode key2Node = JacksonUtil.newObjectNode();
key2Node.put("key3", "value3");
ObjectNode key1Node = JacksonUtil.newObjectNode();
key1Node.set("key2", key2Node);
ObjectNode node = JacksonUtil.newObjectNode();
node.set("key1", key1Node);
TbMsg msg = TbMsg.newMsg("CUSTOM", TenantId.SYS_TENANT_ID, md, JacksonUtil.toString(node));
String result = TbNodeUtils.processPattern(pattern, msg);
Assert.assertEquals("ABC metadata_value value3", result);
}
@Test
public void testArrayReplacementDoesNotWork() {
String pattern = "ABC ${key} $[key1.key2[0].key3]";
TbMsgMetaData md = new TbMsgMetaData();
md.putValue("key", "metadata_value");
ObjectNode key2Node = JacksonUtil.newObjectNode();
key2Node.put("key3", "value3");
ObjectNode key1Node = JacksonUtil.newObjectNode();
key1Node.set("key2", key2Node);
ObjectNode node = JacksonUtil.newObjectNode();
node.set("key1", key1Node);
TbMsg msg = TbMsg.newMsg("CUSTOM", TenantId.SYS_TENANT_ID, md, JacksonUtil.toString(node));
String result = TbNodeUtils.processPattern(pattern, msg);
Assert.assertEquals("ABC metadata_value $[key1.key2[0].key3]", result);
}
}

View File

@ -79,7 +79,7 @@ public abstract class TbAbstractCustomerActionNode<C extends TbAbstractCustomerA
protected abstract void doProcessCustomerAction(TbContext ctx, TbMsg msg, CustomerId customerId);
protected ListenableFuture<CustomerId> getCustomer(TbContext ctx, TbMsg msg) {
String customerTitle = TbNodeUtils.processPattern(this.config.getCustomerNamePattern(), msg.getMetaData());
String customerTitle = TbNodeUtils.processPattern(this.config.getCustomerNamePattern(), msg);
CustomerKey key = new CustomerKey(customerTitle);
return ctx.getDbCallbackExecutor().executeAsync(() -> {
Optional<CustomerId> customerId = customerIdCache.get(key);

View File

@ -140,7 +140,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
}
protected String processPattern(TbMsg msg, String pattern) {
return TbNodeUtils.processPattern(pattern, msg.getMetaData());
return TbNodeUtils.processPattern(pattern, msg);
}
@Data

View File

@ -56,7 +56,7 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
@Override
protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg.getMetaData());
String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg);
ListenableFuture<Alarm> alarmFuture;
if (msg.getOriginator().getEntityType().equals(EntityType.ALARM)) {
alarmFuture = ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), new AlarmId(msg.getOriginator().getId()));

View File

@ -70,7 +70,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
final Alarm msgAlarm;
if (!config.isUseMessageAlarmData()) {
alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg.getMetaData());
alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg);
msgAlarm = null;
} else {
try {
@ -151,7 +151,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
.status(AlarmStatus.ACTIVE_UNACK)
.severity(config.getSeverity())
.propagate(config.isPropagate())
.type(TbNodeUtils.processPattern(this.config.getAlarmType(), msg.getMetaData()))
.type(TbNodeUtils.processPattern(this.config.getAlarmType(), msg))
.propagateRelationTypes(relationTypes)
//todo-vp: alarm date should be taken from Message or current Time should be used?
// .startTs(System.currentTimeMillis())

View File

@ -83,7 +83,7 @@ public class TbSnsNode implements TbNode {
}
private TbMsg publishMessage(TbContext ctx, TbMsg msg) {
String topicArn = TbNodeUtils.processPattern(this.config.getTopicArnPattern(), msg.getMetaData());
String topicArn = TbNodeUtils.processPattern(this.config.getTopicArnPattern(), msg);
PublishRequest publishRequest = new PublishRequest()
.withTopicArn(topicArn)
.withMessage(msg.getData());

View File

@ -91,14 +91,14 @@ public class TbSqsNode implements TbNode {
}
private TbMsg publishMessage(TbContext ctx, TbMsg msg) {
String queueUrl = TbNodeUtils.processPattern(this.config.getQueueUrlPattern(), msg.getMetaData());
String queueUrl = TbNodeUtils.processPattern(this.config.getQueueUrlPattern(), msg);
SendMessageRequest sendMsgRequest = new SendMessageRequest();
sendMsgRequest.withQueueUrl(queueUrl);
sendMsgRequest.withMessageBody(msg.getData());
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
this.config.getMessageAttributes().forEach((k,v) -> {
String name = TbNodeUtils.processPattern(k, msg.getMetaData());
String val = TbNodeUtils.processPattern(v, msg.getMetaData());
String name = TbNodeUtils.processPattern(k, msg);
String val = TbNodeUtils.processPattern(v, msg);
messageAttributes.put(name, new MessageAttributeValue().withDataType("String").withStringValue(val));
});
sendMsgRequest.setMessageAttributes(messageAttributes);

View File

@ -84,7 +84,7 @@ public class TbMsgDelayNode implements TbNode {
int periodInSeconds;
if (config.isUseMetadataPeriodInSecondsPatterns()) {
if (isParsable(msg, config.getPeriodInSecondsPattern())) {
periodInSeconds = Integer.parseInt(TbNodeUtils.processPattern(config.getPeriodInSecondsPattern(), msg.getMetaData()));
periodInSeconds = Integer.parseInt(TbNodeUtils.processPattern(config.getPeriodInSecondsPattern(), msg));
} else {
throw new RuntimeException("Can't parse period in seconds from metadata using pattern: " + config.getPeriodInSecondsPattern());
}
@ -95,7 +95,7 @@ public class TbMsgDelayNode implements TbNode {
}
private boolean isParsable(TbMsg msg, String pattern) {
return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg.getMetaData()));
return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg));
}
@Override

View File

@ -91,8 +91,8 @@ public class TbPubSubNode implements TbNode {
PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder();
pubsubMessageBuilder.setData(data);
this.config.getMessageAttributes().forEach((k, v) -> {
String name = TbNodeUtils.processPattern(k, msg.getMetaData());
String val = TbNodeUtils.processPattern(v, msg.getMetaData());
String name = TbNodeUtils.processPattern(k, msg);
String val = TbNodeUtils.processPattern(v, msg);
pubsubMessageBuilder.putAttributes(name, val);
});
ApiFuture<String> messageIdFuture = this.pubSubClient.publish(pubsubMessageBuilder.build());

View File

@ -94,7 +94,7 @@ public class TbKafkaNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData());
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
try {
ctx.getExternalCallExecutor().executeAsync(() -> {
publish(ctx, msg, topic);

View File

@ -72,18 +72,18 @@ public class TbMsgToEmailNode implements TbNode {
private EmailPojo convert(TbMsg msg) throws IOException {
EmailPojo.EmailPojoBuilder builder = EmailPojo.builder();
builder.from(fromTemplate(this.config.getFromTemplate(), msg.getMetaData()));
builder.to(fromTemplate(this.config.getToTemplate(), msg.getMetaData()));
builder.cc(fromTemplate(this.config.getCcTemplate(), msg.getMetaData()));
builder.bcc(fromTemplate(this.config.getBccTemplate(), msg.getMetaData()));
builder.subject(fromTemplate(this.config.getSubjectTemplate(), msg.getMetaData()));
builder.body(fromTemplate(this.config.getBodyTemplate(), msg.getMetaData()));
builder.from(fromTemplate(this.config.getFromTemplate(), msg));
builder.to(fromTemplate(this.config.getToTemplate(), msg));
builder.cc(fromTemplate(this.config.getCcTemplate(), msg));
builder.bcc(fromTemplate(this.config.getBccTemplate(), msg));
builder.subject(fromTemplate(this.config.getSubjectTemplate(), msg));
builder.body(fromTemplate(this.config.getBodyTemplate(), msg));
return builder.build();
}
private String fromTemplate(String template, TbMsgMetaData metaData) {
private String fromTemplate(String template, TbMsg msg) {
if (!StringUtils.isEmpty(template)) {
return TbNodeUtils.processPattern(template, metaData);
return TbNodeUtils.processPattern(template, msg);
} else {
return null;
}

View File

@ -93,10 +93,10 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
}
ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>();
ListenableFuture<List<Void>> allFutures = Futures.allAsList(
putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg.getMetaData()), failuresMap),
putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg.getMetaData()), failuresMap, "cs_"),
putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg.getMetaData()), failuresMap, "shared_"),
putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg.getMetaData()), failuresMap, "ss_")
putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap),
putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap, "cs_"),
putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap, "shared_"),
putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap, "ss_")
);
withCallback(allFutures, i -> {
if (!failuresMap.isEmpty()) {

View File

@ -104,7 +104,7 @@ public class TbGetTelemetryNode implements TbNode {
if (config.isUseMetadataIntervalPatterns()) {
checkMetadataKeyPatterns(msg);
}
List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg.getMetaData());
List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg);
ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(msg, keys));
DonAsynchron.withCallback(list, data -> {
process(data, msg, keys);
@ -198,10 +198,10 @@ public class TbGetTelemetryNode implements TbNode {
Interval interval = new Interval();
if (config.isUseMetadataIntervalPatterns()) {
if (isParsable(msg, config.getStartIntervalPattern())) {
interval.setStartTs(Long.parseLong(TbNodeUtils.processPattern(config.getStartIntervalPattern(), msg.getMetaData())));
interval.setStartTs(Long.parseLong(TbNodeUtils.processPattern(config.getStartIntervalPattern(), msg)));
}
if (isParsable(msg, config.getEndIntervalPattern())) {
interval.setEndTs(Long.parseLong(TbNodeUtils.processPattern(config.getEndIntervalPattern(), msg.getMetaData())));
interval.setEndTs(Long.parseLong(TbNodeUtils.processPattern(config.getEndIntervalPattern(), msg)));
}
} else {
long ts = System.currentTimeMillis();
@ -212,7 +212,7 @@ public class TbGetTelemetryNode implements TbNode {
}
private boolean isParsable(TbMsg msg, String pattern) {
return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg.getMetaData()));
return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg));
}
private void checkMetadataKeyPatterns(TbMsg msg) {

View File

@ -76,7 +76,7 @@ public class TbMqttNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg.getMetaData());
String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg);
this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE)
.addListener(future -> {
if (future.isSuccess()) {

View File

@ -90,11 +90,11 @@ public class TbRabbitMqNode implements TbNode {
private TbMsg publishMessage(TbContext ctx, TbMsg msg) throws Exception {
String exchangeName = "";
if (!StringUtils.isEmpty(this.config.getExchangeNamePattern())) {
exchangeName = TbNodeUtils.processPattern(this.config.getExchangeNamePattern(), msg.getMetaData());
exchangeName = TbNodeUtils.processPattern(this.config.getExchangeNamePattern(), msg);
}
String routingKey = "";
if (!StringUtils.isEmpty(this.config.getRoutingKeyPattern())) {
routingKey = TbNodeUtils.processPattern(this.config.getRoutingKeyPattern(), msg.getMetaData());
routingKey = TbNodeUtils.processPattern(this.config.getRoutingKeyPattern(), msg);
}
AMQP.BasicProperties properties = null;
if (!StringUtils.isEmpty(this.config.getMessageProperties())) {

View File

@ -171,8 +171,8 @@ public class TbHttpClient {
}
public void processMessage(TbContext ctx, TbMsg msg) {
String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg.getMetaData());
HttpHeaders headers = prepareHeaders(msg.getMetaData());
String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg);
HttpHeaders headers = prepareHeaders(msg);
HttpMethod method = HttpMethod.valueOf(config.getRequestMethod());
HttpEntity<String> entity = new HttpEntity<>(msg.getData(), headers);
@ -231,9 +231,9 @@ public class TbHttpClient {
return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
}
private HttpHeaders prepareHeaders(TbMsgMetaData metaData) {
private HttpHeaders prepareHeaders(TbMsg msg) {
HttpHeaders headers = new HttpHeaders();
config.getHeaders().forEach((k, v) -> headers.add(TbNodeUtils.processPattern(k, metaData), TbNodeUtils.processPattern(v, metaData)));
config.getHeaders().forEach((k, v) -> headers.add(TbNodeUtils.processPattern(k, msg), TbNodeUtils.processPattern(v, msg)));
ClientCredentials credentials = config.getCredentials();
if (CredentialsType.BASIC == credentials.getType()) {
BasicCredentials basicCredentials = (BasicCredentials) credentials;

View File

@ -72,8 +72,8 @@ public class TbSendSmsNode implements TbNode {
}
private void sendSms(TbContext ctx, TbMsg msg) throws Exception {
String numbersTo = TbNodeUtils.processPattern(this.config.getNumbersToTemplate(), msg.getMetaData());
String message = TbNodeUtils.processPattern(this.config.getSmsMessageTemplate(), msg.getMetaData());
String numbersTo = TbNodeUtils.processPattern(this.config.getNumbersToTemplate(), msg);
String message = TbNodeUtils.processPattern(this.config.getSmsMessageTemplate(), msg);
String[] numbersToList = numbersTo.split(",");
if (this.config.isUseSystemSmsSettings()) {
ctx.getSmsService().sendSms(ctx.getTenantId(), numbersToList, message);