From efad71b5ac3161a04ac4f867d064437d514bafdb Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Wed, 10 Feb 2021 17:06:20 +0200 Subject: [PATCH] Fix TbKafkaNode. Configure directly serializer class for key/values instead of string class name. --- .../server/queue/kafka/TbKafkaSettings.java | 12 ++++++++---- dao/src/test/resources/nosql-test.properties | 2 ++ dao/src/test/resources/sql-test.properties | 4 +++- .../rule/engine/kafka/TbKafkaNode.java | 19 +++++++++++++++++-- 4 files changed, 30 insertions(+), 7 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 4c35c0ca1f..86738acea8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -22,6 +22,10 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -107,8 +111,8 @@ public class TbKafkaSettings { props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); return props; } @@ -120,8 +124,8 @@ public class TbKafkaSettings { props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); return props; } diff --git a/dao/src/test/resources/nosql-test.properties b/dao/src/test/resources/nosql-test.properties index eefa4f0b95..6fe2dc2112 100644 --- a/dao/src/test/resources/nosql-test.properties +++ b/dao/src/test/resources/nosql-test.properties @@ -6,6 +6,8 @@ sql.ts_inserts_fixed_thread_pool_size=10 spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation=true spring.jpa.properties.hibernate.order_by.default_null_ordering=last +spring.jpa.properties.hibernate.jdbc.log.warnings=false + spring.jpa.show-sql=false spring.jpa.hibernate.ddl-auto=none spring.jpa.database-platform=org.hibernate.dialect.HSQLDialect diff --git a/dao/src/test/resources/sql-test.properties b/dao/src/test/resources/sql-test.properties index 058d7ac056..358b9ea3af 100644 --- a/dao/src/test/resources/sql-test.properties +++ b/dao/src/test/resources/sql-test.properties @@ -7,6 +7,8 @@ sql.ts_key_value_partitioning=MONTHS # spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation=true spring.jpa.properties.hibernate.order_by.default_null_ordering=last +spring.jpa.properties.hibernate.jdbc.log.warnings=false + spring.jpa.show-sql=false spring.jpa.hibernate.ddl-auto=validate spring.jpa.database-platform=org.hibernate.dialect.HSQLDialect @@ -49,4 +51,4 @@ queue.rule-engine.queues[0].pack-processing-timeout=3000 queue.rule-engine.queues[0].processing-strategy.type=SKIP_ALL_FAILURES queue.rule-engine.queues[0].submit-strategy.type=BURST -sql.log_entity_queries=true \ No newline at end of file +sql.log_entity_queries=true diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index acc86b1e62..0ac03e1afb 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -17,11 +17,13 @@ package org.thingsboard.rule.engine.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -73,8 +75,8 @@ public class TbKafkaNode implements TbNode { Properties properties = new Properties(); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer()); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getKafkaSerializerClass(config.getValueSerializer())); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKafkaSerializerClass(config.getKeySerializer())); properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks()); properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries()); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize()); @@ -92,6 +94,19 @@ public class TbKafkaNode implements TbNode { } } + private Class getKafkaSerializerClass(String serializerClassName) { + Class serializerClass = null; + if (!StringUtils.isEmpty(serializerClassName)) { + try { + serializerClass = Class.forName(serializerClassName); + } catch (ClassNotFoundException e) {} + } + if (serializerClass == null) { + serializerClass = StringSerializer.class; + } + return serializerClass; + } + @Override public void onMsg(TbContext ctx, TbMsg msg) { String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);