diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java similarity index 100% rename from dao/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java rename to common/dao-api/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java diff --git a/rule-engine/rule-engine-api/pom.xml b/rule-engine/rule-engine-api/pom.xml index c303c6984e..bb348793c1 100644 --- a/rule-engine/rule-engine-api/pom.xml +++ b/rule-engine/rule-engine-api/pom.xml @@ -88,6 +88,17 @@ javax.mail provided + + junit + junit + ${junit.version} + test + + + org.mockito + mockito-core + test + diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java index 8804320295..4f02b05c7c 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java @@ -16,15 +16,20 @@ package org.thingsboard.rule.engine.api.util; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; /** @@ -34,8 +39,11 @@ public class TbNodeUtils { private static final ObjectMapper mapper = new ObjectMapper(); - private static final String VARIABLE_TEMPLATE = "${%s}"; + private static final String METADATA_VARIABLE_TEMPLATE = "${%s}"; + private static final Pattern DATA_PATTERN = Pattern.compile("(\\$\\[)(.*?)(])"); + + private static final String DATA_VARIABLE_TEMPLATE = "$[%s]"; public static T convert(TbNodeConfiguration configuration, Class clazz) throws TbNodeException { try { @@ -45,6 +53,43 @@ public class TbNodeUtils { } } + public static List processPatterns(List patterns, TbMsg tbMsg) { + if (!CollectionUtils.isEmpty(patterns)) { + return patterns.stream().map(p -> processPattern(p, tbMsg)).collect(Collectors.toList()); + } + return Collections.emptyList(); + } + + public static String processPattern(String pattern, TbMsg tbMsg) { + try { + String result = processPattern(pattern, tbMsg.getMetaData()); + JsonNode json = mapper.readTree(tbMsg.getData()); + if (json.isObject()) { + Matcher matcher = DATA_PATTERN.matcher(result); + while (matcher.find()) { + String group = matcher.group(2); + String[] keys = group.split("\\."); + JsonNode jsonNode = json; + for (String key : keys) { + if (!StringUtils.isEmpty(key) && jsonNode != null) { + jsonNode = jsonNode.get(key); + } else { + jsonNode = null; + break; + } + } + + if (jsonNode != null && !jsonNode.isObject() && !jsonNode.isArray()) { + result = result.replace(String.format(DATA_VARIABLE_TEMPLATE, group), jsonNode.asText()); + } + } + } + return result; + } catch (Exception e) { + throw new RuntimeException("Failed to process pattern!", e); + } + } + public static List processPatterns(List patterns, TbMsgMetaData metaData) { if (!CollectionUtils.isEmpty(patterns)) { return patterns.stream().map(p -> processPattern(p, metaData)).collect(Collectors.toList()); @@ -53,15 +98,15 @@ public class TbNodeUtils { } public static String processPattern(String pattern, TbMsgMetaData metaData) { - String result = new String(pattern); - for (Map.Entry keyVal : metaData.values().entrySet()) { + String result = pattern; + for (Map.Entry keyVal : metaData.values().entrySet()) { result = processVar(result, keyVal.getKey(), keyVal.getValue()); } return result; } private static String processVar(String pattern, String key, String val) { - String varPattern = String.format(VARIABLE_TEMPLATE, key); + String varPattern = String.format(METADATA_VARIABLE_TEMPLATE, key); return pattern.replace(varPattern, val); } diff --git a/rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/util/TbNodeUtilsTest.java b/rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/util/TbNodeUtilsTest.java new file mode 100644 index 0000000000..fb95c07529 --- /dev/null +++ b/rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/util/TbNodeUtilsTest.java @@ -0,0 +1,115 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api.util; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.util.mapping.JacksonUtil; + +@RunWith(MockitoJUnitRunner.class) +public class TbNodeUtilsTest { + + @Test + public void testSimpleReplacement() { + String pattern = "ABC ${metadata_key} $[data_key]"; + TbMsgMetaData md = new TbMsgMetaData(); + md.putValue("metadata_key", "metadata_value"); + + ObjectNode node = JacksonUtil.newObjectNode(); + node.put("data_key", "data_value"); + + TbMsg msg = TbMsg.newMsg("CUSTOM", TenantId.SYS_TENANT_ID, md, JacksonUtil.toString(node)); + String result = TbNodeUtils.processPattern(pattern, msg); + Assert.assertEquals("ABC metadata_value data_value", result); + } + + @Test + public void testNoReplacement() { + String pattern = "ABC ${metadata_key} $[data_key]"; + TbMsgMetaData md = new TbMsgMetaData(); + md.putValue("key", "metadata_value"); + + ObjectNode node = JacksonUtil.newObjectNode(); + node.put("key", "data_value"); + + TbMsg msg = TbMsg.newMsg("CUSTOM", TenantId.SYS_TENANT_ID, md, JacksonUtil.toString(node)); + String result = TbNodeUtils.processPattern(pattern, msg); + Assert.assertEquals(pattern, result); + } + + @Test + public void testSameKeysReplacement() { + String pattern = "ABC ${key} $[key]"; + TbMsgMetaData md = new TbMsgMetaData(); + md.putValue("key", "metadata_value"); + + ObjectNode node = JacksonUtil.newObjectNode(); + node.put("key", "data_value"); + + TbMsg msg = TbMsg.newMsg("CUSTOM", TenantId.SYS_TENANT_ID, md, JacksonUtil.toString(node)); + String result = TbNodeUtils.processPattern(pattern, msg); + Assert.assertEquals("ABC metadata_value data_value", result); + } + + @Test + public void testComplexObjectReplacement() { + String pattern = "ABC ${key} $[key1.key2.key3]"; + TbMsgMetaData md = new TbMsgMetaData(); + md.putValue("key", "metadata_value"); + + ObjectNode key2Node = JacksonUtil.newObjectNode(); + key2Node.put("key3", "value3"); + + ObjectNode key1Node = JacksonUtil.newObjectNode(); + key1Node.set("key2", key2Node); + + + ObjectNode node = JacksonUtil.newObjectNode(); + node.set("key1", key1Node); + + TbMsg msg = TbMsg.newMsg("CUSTOM", TenantId.SYS_TENANT_ID, md, JacksonUtil.toString(node)); + String result = TbNodeUtils.processPattern(pattern, msg); + Assert.assertEquals("ABC metadata_value value3", result); + } + + @Test + public void testArrayReplacementDoesNotWork() { + String pattern = "ABC ${key} $[key1.key2[0].key3]"; + TbMsgMetaData md = new TbMsgMetaData(); + md.putValue("key", "metadata_value"); + + ObjectNode key2Node = JacksonUtil.newObjectNode(); + key2Node.put("key3", "value3"); + + ObjectNode key1Node = JacksonUtil.newObjectNode(); + key1Node.set("key2", key2Node); + + + ObjectNode node = JacksonUtil.newObjectNode(); + node.set("key1", key1Node); + + TbMsg msg = TbMsg.newMsg("CUSTOM", TenantId.SYS_TENANT_ID, md, JacksonUtil.toString(node)); + String result = TbNodeUtils.processPattern(pattern, msg); + Assert.assertEquals("ABC metadata_value $[key1.key2[0].key3]", result); + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractCustomerActionNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractCustomerActionNode.java index 30f3c59873..39f8b6fa3a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractCustomerActionNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractCustomerActionNode.java @@ -79,7 +79,7 @@ public abstract class TbAbstractCustomerActionNode getCustomer(TbContext ctx, TbMsg msg) { - String customerTitle = TbNodeUtils.processPattern(this.config.getCustomerNamePattern(), msg.getMetaData()); + String customerTitle = TbNodeUtils.processPattern(this.config.getCustomerNamePattern(), msg); CustomerKey key = new CustomerKey(customerTitle); return ctx.getDbCallbackExecutor().executeAsync(() -> { Optional customerId = customerIdCache.get(key); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractRelationActionNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractRelationActionNode.java index afa04236b5..12bbdf450d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractRelationActionNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractRelationActionNode.java @@ -140,7 +140,7 @@ public abstract class TbAbstractRelationActionNode processAlarm(TbContext ctx, TbMsg msg) { - String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg.getMetaData()); + String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg); ListenableFuture alarmFuture; if (msg.getOriginator().getEntityType().equals(EntityType.ALARM)) { alarmFuture = ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), new AlarmId(msg.getOriginator().getId())); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java index dcae930fbd..c941d6ca92 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java @@ -70,7 +70,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode messageAttributes = new HashMap<>(); this.config.getMessageAttributes().forEach((k,v) -> { - String name = TbNodeUtils.processPattern(k, msg.getMetaData()); - String val = TbNodeUtils.processPattern(v, msg.getMetaData()); + String name = TbNodeUtils.processPattern(k, msg); + String val = TbNodeUtils.processPattern(v, msg); messageAttributes.put(name, new MessageAttributeValue().withDataType("String").withStringValue(val)); }); sendMsgRequest.setMessageAttributes(messageAttributes); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java index d72c56981e..9192f0a55f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java @@ -84,7 +84,7 @@ public class TbMsgDelayNode implements TbNode { int periodInSeconds; if (config.isUseMetadataPeriodInSecondsPatterns()) { if (isParsable(msg, config.getPeriodInSecondsPattern())) { - periodInSeconds = Integer.parseInt(TbNodeUtils.processPattern(config.getPeriodInSecondsPattern(), msg.getMetaData())); + periodInSeconds = Integer.parseInt(TbNodeUtils.processPattern(config.getPeriodInSecondsPattern(), msg)); } else { throw new RuntimeException("Can't parse period in seconds from metadata using pattern: " + config.getPeriodInSecondsPattern()); } @@ -95,7 +95,7 @@ public class TbMsgDelayNode implements TbNode { } private boolean isParsable(TbMsg msg, String pattern) { - return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg.getMetaData())); + return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg)); } @Override diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java index c04de9e070..17c49adfc7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java @@ -91,8 +91,8 @@ public class TbPubSubNode implements TbNode { PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder(); pubsubMessageBuilder.setData(data); this.config.getMessageAttributes().forEach((k, v) -> { - String name = TbNodeUtils.processPattern(k, msg.getMetaData()); - String val = TbNodeUtils.processPattern(v, msg.getMetaData()); + String name = TbNodeUtils.processPattern(k, msg); + String val = TbNodeUtils.processPattern(v, msg); pubsubMessageBuilder.putAttributes(name, val); }); ApiFuture messageIdFuture = this.pubSubClient.publish(pubsubMessageBuilder.build()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index 89cc747811..acc86b1e62 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -94,7 +94,7 @@ public class TbKafkaNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData()); + String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg); try { ctx.getExternalCallExecutor().executeAsync(() -> { publish(ctx, msg, topic); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java index 854c89e720..8372ebdd10 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java @@ -72,18 +72,18 @@ public class TbMsgToEmailNode implements TbNode { private EmailPojo convert(TbMsg msg) throws IOException { EmailPojo.EmailPojoBuilder builder = EmailPojo.builder(); - builder.from(fromTemplate(this.config.getFromTemplate(), msg.getMetaData())); - builder.to(fromTemplate(this.config.getToTemplate(), msg.getMetaData())); - builder.cc(fromTemplate(this.config.getCcTemplate(), msg.getMetaData())); - builder.bcc(fromTemplate(this.config.getBccTemplate(), msg.getMetaData())); - builder.subject(fromTemplate(this.config.getSubjectTemplate(), msg.getMetaData())); - builder.body(fromTemplate(this.config.getBodyTemplate(), msg.getMetaData())); + builder.from(fromTemplate(this.config.getFromTemplate(), msg)); + builder.to(fromTemplate(this.config.getToTemplate(), msg)); + builder.cc(fromTemplate(this.config.getCcTemplate(), msg)); + builder.bcc(fromTemplate(this.config.getBccTemplate(), msg)); + builder.subject(fromTemplate(this.config.getSubjectTemplate(), msg)); + builder.body(fromTemplate(this.config.getBodyTemplate(), msg)); return builder.build(); } - private String fromTemplate(String template, TbMsgMetaData metaData) { + private String fromTemplate(String template, TbMsg msg) { if (!StringUtils.isEmpty(template)) { - return TbNodeUtils.processPattern(template, metaData); + return TbNodeUtils.processPattern(template, msg); } else { return null; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java index 879cdd76e6..0bb8cc7cfc 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java @@ -93,10 +93,10 @@ public abstract class TbAbstractGetAttributesNode> failuresMap = new ConcurrentHashMap<>(); ListenableFuture> allFutures = Futures.allAsList( - putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg.getMetaData()), failuresMap), - putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg.getMetaData()), failuresMap, "cs_"), - putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg.getMetaData()), failuresMap, "shared_"), - putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg.getMetaData()), failuresMap, "ss_") + putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap), + putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap, "cs_"), + putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap, "shared_"), + putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap, "ss_") ); withCallback(allFutures, i -> { if (!failuresMap.isEmpty()) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index b34c89b4c9..f484a5551c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -104,7 +104,7 @@ public class TbGetTelemetryNode implements TbNode { if (config.isUseMetadataIntervalPatterns()) { checkMetadataKeyPatterns(msg); } - List keys = TbNodeUtils.processPatterns(tsKeyNames, msg.getMetaData()); + List keys = TbNodeUtils.processPatterns(tsKeyNames, msg); ListenableFuture> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(msg, keys)); DonAsynchron.withCallback(list, data -> { process(data, msg, keys); @@ -198,10 +198,10 @@ public class TbGetTelemetryNode implements TbNode { Interval interval = new Interval(); if (config.isUseMetadataIntervalPatterns()) { if (isParsable(msg, config.getStartIntervalPattern())) { - interval.setStartTs(Long.parseLong(TbNodeUtils.processPattern(config.getStartIntervalPattern(), msg.getMetaData()))); + interval.setStartTs(Long.parseLong(TbNodeUtils.processPattern(config.getStartIntervalPattern(), msg))); } if (isParsable(msg, config.getEndIntervalPattern())) { - interval.setEndTs(Long.parseLong(TbNodeUtils.processPattern(config.getEndIntervalPattern(), msg.getMetaData()))); + interval.setEndTs(Long.parseLong(TbNodeUtils.processPattern(config.getEndIntervalPattern(), msg))); } } else { long ts = System.currentTimeMillis(); @@ -212,7 +212,7 @@ public class TbGetTelemetryNode implements TbNode { } private boolean isParsable(TbMsg msg, String pattern) { - return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg.getMetaData())); + return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg)); } private void checkMetadataKeyPatterns(TbMsg msg) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 2ecf770e19..4c732e0d97 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -76,7 +76,7 @@ public class TbMqttNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg.getMetaData()); + String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg); this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE) .addListener(future -> { if (future.isSuccess()) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java index 898334781e..f481c875a9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java @@ -90,11 +90,11 @@ public class TbRabbitMqNode implements TbNode { private TbMsg publishMessage(TbContext ctx, TbMsg msg) throws Exception { String exchangeName = ""; if (!StringUtils.isEmpty(this.config.getExchangeNamePattern())) { - exchangeName = TbNodeUtils.processPattern(this.config.getExchangeNamePattern(), msg.getMetaData()); + exchangeName = TbNodeUtils.processPattern(this.config.getExchangeNamePattern(), msg); } String routingKey = ""; if (!StringUtils.isEmpty(this.config.getRoutingKeyPattern())) { - routingKey = TbNodeUtils.processPattern(this.config.getRoutingKeyPattern(), msg.getMetaData()); + routingKey = TbNodeUtils.processPattern(this.config.getRoutingKeyPattern(), msg); } AMQP.BasicProperties properties = null; if (!StringUtils.isEmpty(this.config.getMessageProperties())) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java index 9d47376dd4..c485489954 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java @@ -171,8 +171,8 @@ public class TbHttpClient { } public void processMessage(TbContext ctx, TbMsg msg) { - String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg.getMetaData()); - HttpHeaders headers = prepareHeaders(msg.getMetaData()); + String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg); + HttpHeaders headers = prepareHeaders(msg); HttpMethod method = HttpMethod.valueOf(config.getRequestMethod()); HttpEntity entity = new HttpEntity<>(msg.getData(), headers); @@ -231,9 +231,9 @@ public class TbHttpClient { return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData()); } - private HttpHeaders prepareHeaders(TbMsgMetaData metaData) { + private HttpHeaders prepareHeaders(TbMsg msg) { HttpHeaders headers = new HttpHeaders(); - config.getHeaders().forEach((k, v) -> headers.add(TbNodeUtils.processPattern(k, metaData), TbNodeUtils.processPattern(v, metaData))); + config.getHeaders().forEach((k, v) -> headers.add(TbNodeUtils.processPattern(k, msg), TbNodeUtils.processPattern(v, msg))); ClientCredentials credentials = config.getCredentials(); if (CredentialsType.BASIC == credentials.getType()) { BasicCredentials basicCredentials = (BasicCredentials) credentials; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java index 0061ff1cf4..df75afe086 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java @@ -72,8 +72,8 @@ public class TbSendSmsNode implements TbNode { } private void sendSms(TbContext ctx, TbMsg msg) throws Exception { - String numbersTo = TbNodeUtils.processPattern(this.config.getNumbersToTemplate(), msg.getMetaData()); - String message = TbNodeUtils.processPattern(this.config.getSmsMessageTemplate(), msg.getMetaData()); + String numbersTo = TbNodeUtils.processPattern(this.config.getNumbersToTemplate(), msg); + String message = TbNodeUtils.processPattern(this.config.getSmsMessageTemplate(), msg); String[] numbersToList = numbersTo.split(","); if (this.config.isUseSystemSmsSettings()) { ctx.getSmsService().sendSms(ctx.getTenantId(), numbersToList, message);