added ability to use pattern to substitute variables from data in rule nodes
This commit is contained in:
		
							parent
							
								
									b89e22e139
								
							
						
					
					
						commit
						caa5c43288
					
				@ -16,15 +16,20 @@
 | 
			
		||||
package org.thingsboard.rule.engine.api.util;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.core.JsonProcessingException;
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.ObjectMapper;
 | 
			
		||||
import org.apache.commons.lang3.StringUtils;
 | 
			
		||||
import org.springframework.util.CollectionUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.regex.Matcher;
 | 
			
		||||
import java.util.regex.Pattern;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
@ -34,8 +39,11 @@ public class TbNodeUtils {
 | 
			
		||||
 | 
			
		||||
    private static final ObjectMapper mapper = new ObjectMapper();
 | 
			
		||||
 | 
			
		||||
    private static final String VARIABLE_TEMPLATE = "${%s}";
 | 
			
		||||
    private static final String METADATA_VARIABLE_TEMPLATE = "${%s}";
 | 
			
		||||
 | 
			
		||||
    private static final Pattern DATA_PATTERN = Pattern.compile("(\\$\\[)(.*?)(\\])");
 | 
			
		||||
 | 
			
		||||
    private static final String DATA_VARIABLE_TEMPLATE = "$[%s]";
 | 
			
		||||
 | 
			
		||||
    public static <T> T convert(TbNodeConfiguration configuration, Class<T> clazz) throws TbNodeException {
 | 
			
		||||
        try {
 | 
			
		||||
@ -45,6 +53,44 @@ public class TbNodeUtils {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static List<String> processPatterns(List<String> patterns, TbMsg tbMsg) {
 | 
			
		||||
        if (!CollectionUtils.isEmpty(patterns)) {
 | 
			
		||||
            return patterns.stream().map(p -> processPattern(p, tbMsg)).collect(Collectors.toList());
 | 
			
		||||
        }
 | 
			
		||||
        return Collections.emptyList();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static String processPattern(String pattern, TbMsg tbMsg) {
 | 
			
		||||
        try {
 | 
			
		||||
            String result = processPattern(pattern, tbMsg.getMetaData());
 | 
			
		||||
            JsonNode json = mapper.readTree(tbMsg.getData());
 | 
			
		||||
            if (json.isObject()) {
 | 
			
		||||
                Matcher matcher = DATA_PATTERN.matcher(result);
 | 
			
		||||
 | 
			
		||||
                while (matcher.find()) {
 | 
			
		||||
                    String group = matcher.group(2);
 | 
			
		||||
                    String[] keys = group.split("\\.");
 | 
			
		||||
                    JsonNode jsonNode = json;
 | 
			
		||||
                    for (String key : keys) {
 | 
			
		||||
                        if (StringUtils.isNotEmpty(key) && jsonNode != null) {
 | 
			
		||||
                            jsonNode = jsonNode.get(key);
 | 
			
		||||
                        } else {
 | 
			
		||||
                            jsonNode = null;
 | 
			
		||||
                            break;
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    if (jsonNode != null && !jsonNode.isObject() && !jsonNode.isArray()) {
 | 
			
		||||
                        result = result.replace(String.format(DATA_VARIABLE_TEMPLATE, group), jsonNode.asText());
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            return result;
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            throw new RuntimeException("Failed to process pattern!", e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static List<String> processPatterns(List<String> patterns, TbMsgMetaData metaData) {
 | 
			
		||||
        if (!CollectionUtils.isEmpty(patterns)) {
 | 
			
		||||
            return patterns.stream().map(p -> processPattern(p, metaData)).collect(Collectors.toList());
 | 
			
		||||
@ -53,15 +99,15 @@ public class TbNodeUtils {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static String processPattern(String pattern, TbMsgMetaData metaData) {
 | 
			
		||||
        String result = new String(pattern);
 | 
			
		||||
        for (Map.Entry<String,String> keyVal  : metaData.values().entrySet()) {
 | 
			
		||||
        String result = pattern;
 | 
			
		||||
        for (Map.Entry<String, String> keyVal : metaData.values().entrySet()) {
 | 
			
		||||
            result = processVar(result, keyVal.getKey(), keyVal.getValue());
 | 
			
		||||
        }
 | 
			
		||||
        return result;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static String processVar(String pattern, String key, String val) {
 | 
			
		||||
        String varPattern = String.format(VARIABLE_TEMPLATE, key);
 | 
			
		||||
        String varPattern = String.format(METADATA_VARIABLE_TEMPLATE, key);
 | 
			
		||||
        return pattern.replace(varPattern, val);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -79,7 +79,7 @@ public abstract class TbAbstractCustomerActionNode<C extends TbAbstractCustomerA
 | 
			
		||||
    protected abstract void doProcessCustomerAction(TbContext ctx, TbMsg msg, CustomerId customerId);
 | 
			
		||||
 | 
			
		||||
    protected ListenableFuture<CustomerId> getCustomer(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        String customerTitle = TbNodeUtils.processPattern(this.config.getCustomerNamePattern(), msg.getMetaData());
 | 
			
		||||
        String customerTitle = TbNodeUtils.processPattern(this.config.getCustomerNamePattern(), msg);
 | 
			
		||||
        CustomerKey key = new CustomerKey(customerTitle);
 | 
			
		||||
        return ctx.getDbCallbackExecutor().executeAsync(() -> {
 | 
			
		||||
            Optional<CustomerId> customerId = customerIdCache.get(key);
 | 
			
		||||
 | 
			
		||||
@ -140,7 +140,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected String processPattern(TbMsg msg, String pattern) {
 | 
			
		||||
        return TbNodeUtils.processPattern(pattern, msg.getMetaData());
 | 
			
		||||
        return TbNodeUtils.processPattern(pattern, msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Data
 | 
			
		||||
 | 
			
		||||
@ -56,7 +56,7 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg.getMetaData());
 | 
			
		||||
        String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg);
 | 
			
		||||
        ListenableFuture<Alarm> alarmFuture;
 | 
			
		||||
        if (msg.getOriginator().getEntityType().equals(EntityType.ALARM)) {
 | 
			
		||||
            alarmFuture = ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), new AlarmId(msg.getOriginator().getId()));
 | 
			
		||||
 | 
			
		||||
@ -70,7 +70,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
 | 
			
		||||
        final Alarm msgAlarm;
 | 
			
		||||
 | 
			
		||||
        if (!config.isUseMessageAlarmData()) {
 | 
			
		||||
            alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg.getMetaData());
 | 
			
		||||
            alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg);
 | 
			
		||||
            msgAlarm = null;
 | 
			
		||||
        } else {
 | 
			
		||||
            try {
 | 
			
		||||
@ -151,7 +151,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
 | 
			
		||||
                .status(AlarmStatus.ACTIVE_UNACK)
 | 
			
		||||
                .severity(config.getSeverity())
 | 
			
		||||
                .propagate(config.isPropagate())
 | 
			
		||||
                .type(TbNodeUtils.processPattern(this.config.getAlarmType(), msg.getMetaData()))
 | 
			
		||||
                .type(TbNodeUtils.processPattern(this.config.getAlarmType(), msg))
 | 
			
		||||
                .propagateRelationTypes(relationTypes)
 | 
			
		||||
                //todo-vp: alarm date should be taken from Message or current Time should be used?
 | 
			
		||||
//                .startTs(System.currentTimeMillis())
 | 
			
		||||
 | 
			
		||||
@ -83,7 +83,7 @@ public class TbSnsNode implements TbNode {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbMsg publishMessage(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        String topicArn = TbNodeUtils.processPattern(this.config.getTopicArnPattern(), msg.getMetaData());
 | 
			
		||||
        String topicArn = TbNodeUtils.processPattern(this.config.getTopicArnPattern(), msg);
 | 
			
		||||
        PublishRequest publishRequest = new PublishRequest()
 | 
			
		||||
                .withTopicArn(topicArn)
 | 
			
		||||
                .withMessage(msg.getData());
 | 
			
		||||
 | 
			
		||||
@ -91,14 +91,14 @@ public class TbSqsNode implements TbNode {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbMsg publishMessage(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        String queueUrl = TbNodeUtils.processPattern(this.config.getQueueUrlPattern(), msg.getMetaData());
 | 
			
		||||
        String queueUrl = TbNodeUtils.processPattern(this.config.getQueueUrlPattern(), msg);
 | 
			
		||||
        SendMessageRequest sendMsgRequest =  new SendMessageRequest();
 | 
			
		||||
        sendMsgRequest.withQueueUrl(queueUrl);
 | 
			
		||||
        sendMsgRequest.withMessageBody(msg.getData());
 | 
			
		||||
        Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
 | 
			
		||||
        this.config.getMessageAttributes().forEach((k,v) -> {
 | 
			
		||||
            String name = TbNodeUtils.processPattern(k, msg.getMetaData());
 | 
			
		||||
            String val = TbNodeUtils.processPattern(v, msg.getMetaData());
 | 
			
		||||
            String name = TbNodeUtils.processPattern(k, msg);
 | 
			
		||||
            String val = TbNodeUtils.processPattern(v, msg);
 | 
			
		||||
            messageAttributes.put(name, new MessageAttributeValue().withDataType("String").withStringValue(val));
 | 
			
		||||
        });
 | 
			
		||||
        sendMsgRequest.setMessageAttributes(messageAttributes);
 | 
			
		||||
 | 
			
		||||
@ -84,7 +84,7 @@ public class TbMsgDelayNode implements TbNode {
 | 
			
		||||
        int periodInSeconds;
 | 
			
		||||
        if (config.isUseMetadataPeriodInSecondsPatterns()) {
 | 
			
		||||
            if (isParsable(msg, config.getPeriodInSecondsPattern())) {
 | 
			
		||||
                periodInSeconds = Integer.parseInt(TbNodeUtils.processPattern(config.getPeriodInSecondsPattern(), msg.getMetaData()));
 | 
			
		||||
                periodInSeconds = Integer.parseInt(TbNodeUtils.processPattern(config.getPeriodInSecondsPattern(), msg));
 | 
			
		||||
            } else {
 | 
			
		||||
                throw new RuntimeException("Can't parse period in seconds from metadata using pattern: " + config.getPeriodInSecondsPattern());
 | 
			
		||||
            }
 | 
			
		||||
@ -95,7 +95,7 @@ public class TbMsgDelayNode implements TbNode {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean isParsable(TbMsg msg, String pattern) {
 | 
			
		||||
        return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg.getMetaData()));
 | 
			
		||||
        return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -91,8 +91,8 @@ public class TbPubSubNode implements TbNode {
 | 
			
		||||
        PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder();
 | 
			
		||||
        pubsubMessageBuilder.setData(data);
 | 
			
		||||
        this.config.getMessageAttributes().forEach((k, v) -> {
 | 
			
		||||
            String name = TbNodeUtils.processPattern(k, msg.getMetaData());
 | 
			
		||||
            String val = TbNodeUtils.processPattern(v, msg.getMetaData());
 | 
			
		||||
            String name = TbNodeUtils.processPattern(k, msg);
 | 
			
		||||
            String val = TbNodeUtils.processPattern(v, msg);
 | 
			
		||||
            pubsubMessageBuilder.putAttributes(name, val);
 | 
			
		||||
        });
 | 
			
		||||
        ApiFuture<String> messageIdFuture = this.pubSubClient.publish(pubsubMessageBuilder.build());
 | 
			
		||||
 | 
			
		||||
@ -94,7 +94,7 @@ public class TbKafkaNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData());
 | 
			
		||||
        String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
 | 
			
		||||
        try {
 | 
			
		||||
            ctx.getExternalCallExecutor().executeAsync(() -> {
 | 
			
		||||
                publish(ctx, msg, topic);
 | 
			
		||||
 | 
			
		||||
@ -72,18 +72,18 @@ public class TbMsgToEmailNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    private EmailPojo convert(TbMsg msg) throws IOException {
 | 
			
		||||
        EmailPojo.EmailPojoBuilder builder = EmailPojo.builder();
 | 
			
		||||
        builder.from(fromTemplate(this.config.getFromTemplate(), msg.getMetaData()));
 | 
			
		||||
        builder.to(fromTemplate(this.config.getToTemplate(), msg.getMetaData()));
 | 
			
		||||
        builder.cc(fromTemplate(this.config.getCcTemplate(), msg.getMetaData()));
 | 
			
		||||
        builder.bcc(fromTemplate(this.config.getBccTemplate(), msg.getMetaData()));
 | 
			
		||||
        builder.subject(fromTemplate(this.config.getSubjectTemplate(), msg.getMetaData()));
 | 
			
		||||
        builder.body(fromTemplate(this.config.getBodyTemplate(), msg.getMetaData()));
 | 
			
		||||
        builder.from(fromTemplate(this.config.getFromTemplate(), msg));
 | 
			
		||||
        builder.to(fromTemplate(this.config.getToTemplate(), msg));
 | 
			
		||||
        builder.cc(fromTemplate(this.config.getCcTemplate(), msg));
 | 
			
		||||
        builder.bcc(fromTemplate(this.config.getBccTemplate(), msg));
 | 
			
		||||
        builder.subject(fromTemplate(this.config.getSubjectTemplate(), msg));
 | 
			
		||||
        builder.body(fromTemplate(this.config.getBodyTemplate(), msg));
 | 
			
		||||
        return builder.build();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private String fromTemplate(String template, TbMsgMetaData metaData) {
 | 
			
		||||
    private String fromTemplate(String template, TbMsg msg) {
 | 
			
		||||
        if (!StringUtils.isEmpty(template)) {
 | 
			
		||||
            return TbNodeUtils.processPattern(template, metaData);
 | 
			
		||||
            return TbNodeUtils.processPattern(template, msg);
 | 
			
		||||
        } else {
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -92,10 +92,10 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
 | 
			
		||||
        }
 | 
			
		||||
        ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>();
 | 
			
		||||
        ListenableFuture<List<Void>> allFutures = Futures.allAsList(
 | 
			
		||||
                putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg.getMetaData()), failuresMap),
 | 
			
		||||
                putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg.getMetaData()), failuresMap, "cs_"),
 | 
			
		||||
                putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg.getMetaData()), failuresMap, "shared_"),
 | 
			
		||||
                putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg.getMetaData()), failuresMap, "ss_")
 | 
			
		||||
                putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap),
 | 
			
		||||
                putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap, "cs_"),
 | 
			
		||||
                putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap, "shared_"),
 | 
			
		||||
                putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap, "ss_")
 | 
			
		||||
        );
 | 
			
		||||
        withCallback(allFutures, i -> {
 | 
			
		||||
            if (!failuresMap.isEmpty()) {
 | 
			
		||||
 | 
			
		||||
@ -103,7 +103,7 @@ public class TbGetTelemetryNode implements TbNode {
 | 
			
		||||
                if (config.isUseMetadataIntervalPatterns()) {
 | 
			
		||||
                    checkMetadataKeyPatterns(msg);
 | 
			
		||||
                }
 | 
			
		||||
                List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg.getMetaData());
 | 
			
		||||
                List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg);
 | 
			
		||||
                ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(msg, keys));
 | 
			
		||||
                DonAsynchron.withCallback(list, data -> {
 | 
			
		||||
                    process(data, msg, keys);
 | 
			
		||||
@ -197,10 +197,10 @@ public class TbGetTelemetryNode implements TbNode {
 | 
			
		||||
        Interval interval = new Interval();
 | 
			
		||||
        if (config.isUseMetadataIntervalPatterns()) {
 | 
			
		||||
            if (isParsable(msg, config.getStartIntervalPattern())) {
 | 
			
		||||
                interval.setStartTs(Long.parseLong(TbNodeUtils.processPattern(config.getStartIntervalPattern(), msg.getMetaData())));
 | 
			
		||||
                interval.setStartTs(Long.parseLong(TbNodeUtils.processPattern(config.getStartIntervalPattern(), msg)));
 | 
			
		||||
            }
 | 
			
		||||
            if (isParsable(msg, config.getEndIntervalPattern())) {
 | 
			
		||||
                interval.setEndTs(Long.parseLong(TbNodeUtils.processPattern(config.getEndIntervalPattern(), msg.getMetaData())));
 | 
			
		||||
                interval.setEndTs(Long.parseLong(TbNodeUtils.processPattern(config.getEndIntervalPattern(), msg)));
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            long ts = System.currentTimeMillis();
 | 
			
		||||
@ -211,7 +211,7 @@ public class TbGetTelemetryNode implements TbNode {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean isParsable(TbMsg msg, String pattern) {
 | 
			
		||||
        return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg.getMetaData()));
 | 
			
		||||
        return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void checkMetadataKeyPatterns(TbMsg msg) {
 | 
			
		||||
 | 
			
		||||
@ -76,7 +76,7 @@ public class TbMqttNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg.getMetaData());
 | 
			
		||||
        String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg);
 | 
			
		||||
        this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE)
 | 
			
		||||
                .addListener(future -> {
 | 
			
		||||
                            if (future.isSuccess()) {
 | 
			
		||||
 | 
			
		||||
@ -90,11 +90,11 @@ public class TbRabbitMqNode implements TbNode {
 | 
			
		||||
    private TbMsg publishMessage(TbContext ctx, TbMsg msg) throws Exception {
 | 
			
		||||
        String exchangeName = "";
 | 
			
		||||
        if (!StringUtils.isEmpty(this.config.getExchangeNamePattern())) {
 | 
			
		||||
            exchangeName = TbNodeUtils.processPattern(this.config.getExchangeNamePattern(), msg.getMetaData());
 | 
			
		||||
            exchangeName = TbNodeUtils.processPattern(this.config.getExchangeNamePattern(), msg);
 | 
			
		||||
        }
 | 
			
		||||
        String routingKey = "";
 | 
			
		||||
        if (!StringUtils.isEmpty(this.config.getRoutingKeyPattern())) {
 | 
			
		||||
            routingKey = TbNodeUtils.processPattern(this.config.getRoutingKeyPattern(), msg.getMetaData());
 | 
			
		||||
            routingKey = TbNodeUtils.processPattern(this.config.getRoutingKeyPattern(), msg);
 | 
			
		||||
        }
 | 
			
		||||
        AMQP.BasicProperties properties = null;
 | 
			
		||||
        if (!StringUtils.isEmpty(this.config.getMessageProperties())) {
 | 
			
		||||
 | 
			
		||||
@ -172,8 +172,8 @@ public class TbHttpClient {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void processMessage(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg.getMetaData());
 | 
			
		||||
        HttpHeaders headers = prepareHeaders(msg.getMetaData());
 | 
			
		||||
        String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg);
 | 
			
		||||
        HttpHeaders headers = prepareHeaders(msg);
 | 
			
		||||
        HttpMethod method = HttpMethod.valueOf(config.getRequestMethod());
 | 
			
		||||
        HttpEntity<String> entity = new HttpEntity<>(msg.getData(), headers);
 | 
			
		||||
 | 
			
		||||
@ -232,9 +232,9 @@ public class TbHttpClient {
 | 
			
		||||
        return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private HttpHeaders prepareHeaders(TbMsgMetaData metaData) {
 | 
			
		||||
    private HttpHeaders prepareHeaders(TbMsg msg) {
 | 
			
		||||
        HttpHeaders headers = new HttpHeaders();
 | 
			
		||||
        config.getHeaders().forEach((k, v) -> headers.add(TbNodeUtils.processPattern(k, metaData), TbNodeUtils.processPattern(v, metaData)));
 | 
			
		||||
        config.getHeaders().forEach((k, v) -> headers.add(TbNodeUtils.processPattern(k, msg), TbNodeUtils.processPattern(v, msg)));
 | 
			
		||||
        ClientCredentials credentials = config.getCredentials();
 | 
			
		||||
        if (CredentialsType.BASIC == credentials.getType()) {
 | 
			
		||||
            BasicCredentials basicCredentials = (BasicCredentials) credentials;
 | 
			
		||||
 | 
			
		||||
@ -72,8 +72,8 @@ public class TbSendSmsNode implements TbNode {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void sendSms(TbContext ctx, TbMsg msg) throws Exception {
 | 
			
		||||
        String numbersTo = TbNodeUtils.processPattern(this.config.getNumbersToTemplate(), msg.getMetaData());
 | 
			
		||||
        String message = TbNodeUtils.processPattern(this.config.getSmsMessageTemplate(), msg.getMetaData());
 | 
			
		||||
        String numbersTo = TbNodeUtils.processPattern(this.config.getNumbersToTemplate(), msg);
 | 
			
		||||
        String message = TbNodeUtils.processPattern(this.config.getSmsMessageTemplate(), msg);
 | 
			
		||||
        String[] numbersToList = numbersTo.split(",");
 | 
			
		||||
        if (this.config.isUseSystemSmsSettings()) {
 | 
			
		||||
            ctx.getSmsService().sendSms(ctx.getTenantId(), numbersToList, message);
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user