diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index b9c7d53b1d..fa7efd2dac 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -17,14 +17,11 @@ package org.thingsboard.rule.engine.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.BooleanUtils; -import org.thingsboard.server.common.data.StringUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -76,7 +73,7 @@ public class TbKafkaNode implements TbNode { private boolean addMetadataKeyValuesAsKafkaHeaders; private Charset toBytesCharset; - private Producer producer; + private Producer producer; private Throwable initError; @Override @@ -115,12 +112,20 @@ public class TbKafkaNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg); + String keyPattern = config.getKeyPattern(); try { if (initError != null) { ctx.tellFailure(msg, new RuntimeException("Failed to initialize Kafka rule node producer: " + initError.getMessage())); } else { ctx.getExternalCallExecutor().executeAsync(() -> { - publish(ctx, msg, topic); + publish( + ctx, + msg, + topic, + keyPattern == null || keyPattern.isEmpty() + ? null + : TbNodeUtils.processPattern(config.getKeyPattern(), msg) + ); return null; }); } @@ -129,16 +134,16 @@ public class TbKafkaNode implements TbNode { } } - protected void publish(TbContext ctx, TbMsg msg, String topic) { + protected void publish(TbContext ctx, TbMsg msg, String topic, String key) { try { if (!addMetadataKeyValuesAsKafkaHeaders) { //TODO: external system executor - producer.send(new ProducerRecord<>(topic, msg.getData()), + producer.send(new ProducerRecord<>(topic, key, msg.getData()), (metadata, e) -> processRecord(ctx, msg, metadata, e)); } else { Headers headers = new RecordHeaders(); - msg.getMetaData().values().forEach((key, value) -> headers.add(new RecordHeader(TB_MSG_MD_PREFIX + key, value.getBytes(toBytesCharset)))); - producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers), + msg.getMetaData().values().forEach((k, v) -> headers.add(new RecordHeader(TB_MSG_MD_PREFIX + k, v.getBytes(toBytesCharset)))); + producer.send(new ProducerRecord<>(topic, null, null, key, msg.getData(), headers), (metadata, e) -> processRecord(ctx, msg, metadata, e)); } } catch (Exception e) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java index 2db833b6a2..0e696f3c25 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java @@ -26,6 +26,7 @@ import java.util.Map; public class TbKafkaNodeConfiguration implements NodeConfiguration { private String topicPattern; + private String keyPattern; private String bootstrapServers; private int retries; private int batchSize;