Optional Key pattern field for Kafka rule node
This commit is contained in:
parent
d7aa2e5660
commit
694b8409d7
@ -17,14 +17,11 @@ package org.thingsboard.rule.engine.kafka;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.BooleanUtils;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
import org.apache.kafka.common.header.internals.RecordHeaders;
|
||||
@ -76,7 +73,7 @@ public class TbKafkaNode implements TbNode {
|
||||
private boolean addMetadataKeyValuesAsKafkaHeaders;
|
||||
private Charset toBytesCharset;
|
||||
|
||||
private Producer<?, String> producer;
|
||||
private Producer<String, String> producer;
|
||||
private Throwable initError;
|
||||
|
||||
@Override
|
||||
@ -115,12 +112,20 @@ public class TbKafkaNode implements TbNode {
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
|
||||
String keyPattern = config.getKeyPattern();
|
||||
try {
|
||||
if (initError != null) {
|
||||
ctx.tellFailure(msg, new RuntimeException("Failed to initialize Kafka rule node producer: " + initError.getMessage()));
|
||||
} else {
|
||||
ctx.getExternalCallExecutor().executeAsync(() -> {
|
||||
publish(ctx, msg, topic);
|
||||
publish(
|
||||
ctx,
|
||||
msg,
|
||||
topic,
|
||||
keyPattern == null || keyPattern.isEmpty()
|
||||
? null
|
||||
: TbNodeUtils.processPattern(config.getKeyPattern(), msg)
|
||||
);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
@ -129,16 +134,16 @@ public class TbKafkaNode implements TbNode {
|
||||
}
|
||||
}
|
||||
|
||||
protected void publish(TbContext ctx, TbMsg msg, String topic) {
|
||||
protected void publish(TbContext ctx, TbMsg msg, String topic, String key) {
|
||||
try {
|
||||
if (!addMetadataKeyValuesAsKafkaHeaders) {
|
||||
//TODO: external system executor
|
||||
producer.send(new ProducerRecord<>(topic, msg.getData()),
|
||||
producer.send(new ProducerRecord<>(topic, key, msg.getData()),
|
||||
(metadata, e) -> processRecord(ctx, msg, metadata, e));
|
||||
} else {
|
||||
Headers headers = new RecordHeaders();
|
||||
msg.getMetaData().values().forEach((key, value) -> headers.add(new RecordHeader(TB_MSG_MD_PREFIX + key, value.getBytes(toBytesCharset))));
|
||||
producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers),
|
||||
msg.getMetaData().values().forEach((k, v) -> headers.add(new RecordHeader(TB_MSG_MD_PREFIX + k, v.getBytes(toBytesCharset))));
|
||||
producer.send(new ProducerRecord<>(topic, null, null, key, msg.getData(), headers),
|
||||
(metadata, e) -> processRecord(ctx, msg, metadata, e));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -26,6 +26,7 @@ import java.util.Map;
|
||||
public class TbKafkaNodeConfiguration implements NodeConfiguration<TbKafkaNodeConfiguration> {
|
||||
|
||||
private String topicPattern;
|
||||
private String keyPattern;
|
||||
private String bootstrapServers;
|
||||
private int retries;
|
||||
private int batchSize;
|
||||
|
||||
File diff suppressed because one or more lines are too long
Loading…
x
Reference in New Issue
Block a user