From 515dc983d3deff4e320eebd6219b6d378d04d24c Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Wed, 11 Mar 2020 18:07:42 +0200 Subject: [PATCH] Improvements/kafka rule node (#2505) * added metadata key-values as kafka headers * added default charset to configuration * fix typo --- .../rule/engine/kafka/TbKafkaNode.java | 44 ++++++++++++++----- .../kafka/TbKafkaNodeConfiguration.java | 5 +++ 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index 216fea54e3..267e8711aa 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -16,16 +16,21 @@ package org.thingsboard.rule.engine.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.BooleanUtils; import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.*; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Properties; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; @Slf4j @RuleNode( @@ -46,8 +51,11 @@ public class TbKafkaNode implements TbNode { private static final String PARTITION = "partition"; private static final String TOPIC = "topic"; private static final String ERROR = "error"; + public static final String TB_MSG_MD_PREFIX = "tb_msg_md_"; private TbKafkaNodeConfiguration config; + private boolean addMetadataKeyValuesAsKafkaHeaders; + private Charset toBytesCharset; private Producer producer; @@ -66,8 +74,10 @@ public class TbKafkaNode implements TbNode { properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.getBufferMemory()); if (config.getOtherProperties() != null) { config.getOtherProperties() - .forEach((k,v) -> properties.put(k, v)); + .forEach(properties::put); } + addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false); + toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8; try { this.producer = new KafkaProducer<>(properties); } catch (Exception e) { @@ -79,16 +89,16 @@ public class TbKafkaNode implements TbNode { public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData()); try { - producer.send(new ProducerRecord<>(topic, msg.getData()), - (metadata, e) -> { - if (metadata != null) { - TbMsg next = processResponse(ctx, msg, metadata); - ctx.tellNext(next, TbRelationTypes.SUCCESS); - } else { - TbMsg next = processException(ctx, msg, e); - ctx.tellFailure(next, e); - } - }); + if (!addMetadataKeyValuesAsKafkaHeaders) { + producer.send(new ProducerRecord<>(topic, msg.getData()), + (metadata, e) -> processRecord(ctx, msg, metadata, e)); + } else { + Headers headers = new RecordHeaders(); + msg.getMetaData().values().forEach((key, value) -> headers.add(new RecordHeader(TB_MSG_MD_PREFIX + key, value.getBytes(toBytesCharset)))); + producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers), + (metadata, e) -> processRecord(ctx, msg, metadata, e)); + } + } catch (Exception e) { ctx.tellFailure(msg, e); } @@ -105,6 +115,16 @@ public class TbKafkaNode implements TbNode { } } + private void processRecord(TbContext ctx, TbMsg msg, RecordMetadata metadata, Exception e) { + if (metadata != null) { + TbMsg next = processResponse(ctx, msg, metadata); + ctx.tellNext(next, TbRelationTypes.SUCCESS); + } else { + TbMsg next = processException(ctx, msg, e); + ctx.tellFailure(next, e); + } + } + private TbMsg processResponse(TbContext ctx, TbMsg origMsg, RecordMetadata recordMetadata) { TbMsgMetaData metaData = origMsg.getMetaData().copy(); metaData.putValue(OFFSET, String.valueOf(recordMetadata.offset())); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java index 1e8fe5b0c7..a1d4eedbb4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java @@ -36,6 +36,9 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration otherProperties; + private boolean addMetadataKeyValuesAsKafkaHeaders; + private String kafkaHeadersCharset; + @Override public TbKafkaNodeConfiguration defaultConfiguration() { TbKafkaNodeConfiguration configuration = new TbKafkaNodeConfiguration(); @@ -49,6 +52,8 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration