From 46f5ce56a46f773984dced1f0a00a19fe48d1ede Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 27 Feb 2025 11:52:40 +0100 Subject: [PATCH 1/4] removed serializers from kafka node config, used default StringSerializer --- .../org/thingsboard/rule/engine/kafka/TbKafkaNode.java | 5 +++-- .../rule/engine/kafka/TbKafkaNodeConfiguration.java | 7 +++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index 6008305570..8f6a6dc617 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.util.ReflectionUtils; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -83,8 +84,8 @@ public class TbKafkaNode extends TbAbstractExternalNode { Properties properties = new Properties(); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer()); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks()); properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries()); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java index 867dd2e54e..b3f1fe1473 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.kafka; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; import org.apache.kafka.common.serialization.StringSerializer; import org.thingsboard.rule.engine.api.NodeConfiguration; @@ -23,6 +25,7 @@ import java.util.Collections; import java.util.Map; @Data +@JsonIgnoreProperties(ignoreUnknown = true) public class TbKafkaNodeConfiguration implements NodeConfiguration { private String topicPattern; @@ -33,8 +36,6 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration otherProperties; private boolean addMetadataKeyValuesAsKafkaHeaders; @@ -50,8 +51,6 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration Date: Fri, 28 Feb 2025 10:03:23 +0200 Subject: [PATCH 2/4] UI: Remove key and value serializer form kafka node --- .../rule-node/external/kafka-config.component.html | 14 -------------- .../rule-node/external/kafka-config.component.ts | 2 -- .../src/assets/locale/locale.constant-en_US.json | 4 ---- 3 files changed, 20 deletions(-) diff --git a/ui-ngx/src/app/modules/home/components/rule-node/external/kafka-config.component.html b/ui-ngx/src/app/modules/home/components/rule-node/external/kafka-config.component.html index 3aa743247f..1132c9982c 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/external/kafka-config.component.html +++ b/ui-ngx/src/app/modules/home/components/rule-node/external/kafka-config.component.html @@ -73,20 +73,6 @@ - - rule-node-config.key-serializer - - - {{ 'rule-node-config.key-serializer-required' | translate }} - - - - rule-node-config.value-serializer - - - {{ 'rule-node-config.value-serializer-required' | translate }} - - Date: Sat, 8 Mar 2025 20:54:08 +0100 Subject: [PATCH 3/4] added upgrade for the kafka node --- .../rule/engine/kafka/TbKafkaNode.java | 22 +++++ .../rule/engine/kafka/TbKafkaNodeTest.java | 81 +++++++++++++++++-- 2 files changed, 98 insertions(+), 5 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index 8f6a6dc617..d544d0647d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.kafka; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.BooleanUtils; import org.apache.kafka.clients.producer.KafkaProducer; @@ -36,6 +38,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError; import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -49,6 +52,7 @@ import java.util.Properties; type = ComponentType.EXTERNAL, name = "kafka", configClazz = TbKafkaNodeConfiguration.class, + version = 1, nodeDescription = "Publish messages to Kafka server", nodeDetails = "Will send record via Kafka producer to Kafka server. " + "Outbound message will contain response fields (offset, partition, topic)" + @@ -201,4 +205,22 @@ public class TbKafkaNode extends TbAbstractExternalNode { .build(); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0 -> { + if (oldConfiguration.has("keySerializer") || oldConfiguration.has("valueSerializer")) { + ObjectNode objectConfiguration = (ObjectNode) oldConfiguration; + objectConfiguration.remove("keySerializer"); + objectConfiguration.remove("valueSerializer"); + hasChanges = true; + } + } + default -> { + } + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeTest.java index de81d55fdb..486e707571 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeTest.java @@ -39,8 +39,10 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; import org.thingsboard.rule.engine.TestDbCallbackExecutor; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; @@ -73,7 +75,7 @@ import static org.mockito.BDDMockito.willReturn; import static org.mockito.BDDMockito.willThrow; @ExtendWith(MockitoExtension.class) -public class TbKafkaNodeTest { +public class TbKafkaNodeTest extends AbstractRuleNodeUpgradeTest { private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("5f2eac08-bd1f-4635-a6c2-437369f996cf")); private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("d46bb666-ecab-4d89-a28f-5abdca23ac29")); @@ -117,8 +119,6 @@ public class TbKafkaNodeTest { assertThat(config.getLinger()).isEqualTo(0); assertThat(config.getBufferMemory()).isEqualTo(33554432); assertThat(config.getAcks()).isEqualTo("-1"); - assertThat(config.getKeySerializer()).isEqualTo(StringSerializer.class.getName()); - assertThat(config.getValueSerializer()).isEqualTo(StringSerializer.class.getName()); assertThat(config.getOtherProperties()).isEmpty(); assertThat(config.isAddMetadataKeyValuesAsKafkaHeaders()).isFalse(); assertThat(config.getKafkaHeadersCharset()).isEqualTo("UTF-8"); @@ -163,8 +163,8 @@ public class TbKafkaNodeTest { Properties expectedProperties = new Properties(); expectedProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + RULE_NODE_ID.getId() + "-" + SERVICE_ID_STR); expectedProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); - expectedProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer()); - expectedProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer()); + expectedProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + expectedProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); expectedProperties.put(ProducerConfig.ACKS_CONFIG, config.getAcks()); expectedProperties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries()); expectedProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize()); @@ -454,4 +454,75 @@ public class TbKafkaNodeTest { assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); } + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + //config for version 0 + Arguments.of(0, + "{\n" + + " \"topicPattern\": \"test-topic\",\n" + + " \"keyPattern\": \"test-key\",\n" + + " \"bootstrapServers\": \"localhost:9092\",\n" + + " \"retries\": 0,\n" + + " \"batchSize\": 16384,\n" + + " \"linger\": 0,\n" + + " \"bufferMemory\": 33554432,\n" + + " \"acks\": \"-1\",\n" + + " \"otherProperties\": {},\n" + + " \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" + + " \"kafkaHeadersCharset\": \"UTF-8\",\n" + + " \"keySerializer\": \"org.apache.kafka.common.serialization.StringSerializer\",\n" + + " \"valueSerializer\": \"org.apache.kafka.common.serialization.StringSerializer\"\n" + + "}", + true, + "{\n" + + " \"topicPattern\": \"test-topic\",\n" + + " \"keyPattern\": \"test-key\",\n" + + " \"bootstrapServers\": \"localhost:9092\",\n" + + " \"retries\": 0,\n" + + " \"batchSize\": 16384,\n" + + " \"linger\": 0,\n" + + " \"bufferMemory\": 33554432,\n" + + " \"acks\": \"-1\",\n" + + " \"otherProperties\": {},\n" + + " \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" + + " \"kafkaHeadersCharset\": \"UTF-8\"\n" + + "}" + ), + //config for version 1 with upgrade from version 0 + Arguments.of(1, + "{\n" + + " \"topicPattern\": \"test-topic\",\n" + + " \"keyPattern\": \"test-key\",\n" + + " \"bootstrapServers\": \"localhost:9092\",\n" + + " \"retries\": 0,\n" + + " \"batchSize\": 16384,\n" + + " \"linger\": 0,\n" + + " \"bufferMemory\": 33554432,\n" + + " \"acks\": \"-1\",\n" + + " \"otherProperties\": {},\n" + + " \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" + + " \"kafkaHeadersCharset\": \"UTF-8\"\n" + + "}", + false, + "{\n" + + " \"topicPattern\": \"test-topic\",\n" + + " \"keyPattern\": \"test-key\",\n" + + " \"bootstrapServers\": \"localhost:9092\",\n" + + " \"retries\": 0,\n" + + " \"batchSize\": 16384,\n" + + " \"linger\": 0,\n" + + " \"bufferMemory\": 33554432,\n" + + " \"acks\": \"-1\",\n" + + " \"otherProperties\": {},\n" + + " \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" + + " \"kafkaHeadersCharset\": \"UTF-8\"\n" + + "}" + ) + ); + } + + @Override + protected TbNode getTestNode() { + return node; + } } From aa2df212df19d119f9dd6208177f7949e027627f Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 10 Mar 2025 12:42:29 +0100 Subject: [PATCH 4/4] refactoring accorrding to the comments --- .../rule/engine/kafka/TbKafkaNodeConfiguration.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java index b3f1fe1473..9a31e58ce7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java @@ -15,17 +15,13 @@ */ package org.thingsboard.rule.engine.kafka; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; -import org.apache.kafka.common.serialization.StringSerializer; import org.thingsboard.rule.engine.api.NodeConfiguration; import java.util.Collections; import java.util.Map; @Data -@JsonIgnoreProperties(ignoreUnknown = true) public class TbKafkaNodeConfiguration implements NodeConfiguration { private String topicPattern;