removed serializers from kafka node config, used default StringSerializer

This commit is contained in:
YevhenBondarenko 2025-02-27 11:52:40 +01:00
parent b9e8fae8ab
commit 46f5ce56a4
2 changed files with 6 additions and 6 deletions

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.util.ReflectionUtils; import org.springframework.util.ReflectionUtils;
import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
@ -83,8 +84,8 @@ public class TbKafkaNode extends TbAbstractExternalNode {
Properties properties = new Properties(); Properties properties = new Properties();
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId()); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks()); properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries()); properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize()); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());

View File

@ -15,6 +15,8 @@
*/ */
package org.thingsboard.rule.engine.kafka; package org.thingsboard.rule.engine.kafka;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data; import lombok.Data;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.thingsboard.rule.engine.api.NodeConfiguration; import org.thingsboard.rule.engine.api.NodeConfiguration;
@ -23,6 +25,7 @@ import java.util.Collections;
import java.util.Map; import java.util.Map;
@Data @Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class TbKafkaNodeConfiguration implements NodeConfiguration<TbKafkaNodeConfiguration> { public class TbKafkaNodeConfiguration implements NodeConfiguration<TbKafkaNodeConfiguration> {
private String topicPattern; private String topicPattern;
@ -33,8 +36,6 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration<TbKafkaNodeCo
private int linger; private int linger;
private int bufferMemory; private int bufferMemory;
private String acks; private String acks;
private String keySerializer;
private String valueSerializer;
private Map<String, String> otherProperties; private Map<String, String> otherProperties;
private boolean addMetadataKeyValuesAsKafkaHeaders; private boolean addMetadataKeyValuesAsKafkaHeaders;
@ -50,8 +51,6 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration<TbKafkaNodeCo
configuration.setLinger(0); configuration.setLinger(0);
configuration.setBufferMemory(33554432); configuration.setBufferMemory(33554432);
configuration.setAcks("-1"); configuration.setAcks("-1");
configuration.setKeySerializer(StringSerializer.class.getName());
configuration.setValueSerializer(StringSerializer.class.getName());
configuration.setOtherProperties(Collections.emptyMap()); configuration.setOtherProperties(Collections.emptyMap());
configuration.setAddMetadataKeyValuesAsKafkaHeaders(false); configuration.setAddMetadataKeyValuesAsKafkaHeaders(false);
configuration.setKafkaHeadersCharset("UTF-8"); configuration.setKafkaHeadersCharset("UTF-8");