This commit is contained in:
Andrii Shvaika 2022-10-31 17:02:41 +02:00
commit 3acddcfcfa
2 changed files with 15 additions and 9 deletions

View File

@ -17,14 +17,11 @@ package org.thingsboard.rule.engine.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.thingsboard.server.common.data.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
@ -76,7 +73,7 @@ public class TbKafkaNode implements TbNode {
private boolean addMetadataKeyValuesAsKafkaHeaders;
private Charset toBytesCharset;
private Producer<?, String> producer;
private Producer<String, String> producer;
private Throwable initError;
@Override
@ -115,12 +112,20 @@ public class TbKafkaNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
String keyPattern = config.getKeyPattern();
try {
if (initError != null) {
ctx.tellFailure(msg, new RuntimeException("Failed to initialize Kafka rule node producer: " + initError.getMessage()));
} else {
ctx.getExternalCallExecutor().executeAsync(() -> {
publish(ctx, msg, topic);
publish(
ctx,
msg,
topic,
keyPattern == null || keyPattern.isEmpty()
? null
: TbNodeUtils.processPattern(config.getKeyPattern(), msg)
);
return null;
});
}
@ -129,16 +134,16 @@ public class TbKafkaNode implements TbNode {
}
}
protected void publish(TbContext ctx, TbMsg msg, String topic) {
protected void publish(TbContext ctx, TbMsg msg, String topic, String key) {
try {
if (!addMetadataKeyValuesAsKafkaHeaders) {
//TODO: external system executor
producer.send(new ProducerRecord<>(topic, msg.getData()),
producer.send(new ProducerRecord<>(topic, key, msg.getData()),
(metadata, e) -> processRecord(ctx, msg, metadata, e));
} else {
Headers headers = new RecordHeaders();
msg.getMetaData().values().forEach((key, value) -> headers.add(new RecordHeader(TB_MSG_MD_PREFIX + key, value.getBytes(toBytesCharset))));
producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers),
msg.getMetaData().values().forEach((k, v) -> headers.add(new RecordHeader(TB_MSG_MD_PREFIX + k, v.getBytes(toBytesCharset))));
producer.send(new ProducerRecord<>(topic, null, null, key, msg.getData(), headers),
(metadata, e) -> processRecord(ctx, msg, metadata, e));
}
} catch (Exception e) {

View File

@ -26,6 +26,7 @@ import java.util.Map;
public class TbKafkaNodeConfiguration implements NodeConfiguration<TbKafkaNodeConfiguration> {
private String topicPattern;
private String keyPattern;
private String bootstrapServers;
private int retries;
private int batchSize;