From 46f5ce56a46f773984dced1f0a00a19fe48d1ede Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 27 Feb 2025 11:52:40 +0100 Subject: [PATCH] removed serializers from kafka node config, used default StringSerializer --- .../org/thingsboard/rule/engine/kafka/TbKafkaNode.java | 5 +++-- .../rule/engine/kafka/TbKafkaNodeConfiguration.java | 7 +++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index 6008305570..8f6a6dc617 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.util.ReflectionUtils; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -83,8 +84,8 @@ public class TbKafkaNode extends TbAbstractExternalNode { Properties properties = new Properties(); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer()); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks()); properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries()); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java index 867dd2e54e..b3f1fe1473 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.kafka; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; import org.apache.kafka.common.serialization.StringSerializer; import org.thingsboard.rule.engine.api.NodeConfiguration; @@ -23,6 +25,7 @@ import java.util.Collections; import java.util.Map; @Data +@JsonIgnoreProperties(ignoreUnknown = true) public class TbKafkaNodeConfiguration implements NodeConfiguration { private String topicPattern; @@ -33,8 +36,6 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration otherProperties; private boolean addMetadataKeyValuesAsKafkaHeaders; @@ -50,8 +51,6 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration