diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java
index 6008305570..d544d0647d 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java
@@ -15,6 +15,8 @@
*/
package org.thingsboard.rule.engine.kafka;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -26,6 +28,7 @@ import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.util.ReflectionUtils;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
@@ -35,6 +38,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError;
import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -48,6 +52,7 @@ import java.util.Properties;
type = ComponentType.EXTERNAL,
name = "kafka",
configClazz = TbKafkaNodeConfiguration.class,
+ version = 1,
nodeDescription = "Publish messages to Kafka server",
nodeDetails = "Will send record via Kafka producer to Kafka server. " +
"Outbound message will contain response fields (offset, partition, topic)" +
@@ -83,8 +88,8 @@ public class TbKafkaNode extends TbAbstractExternalNode {
Properties properties = new Properties();
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
@@ -200,4 +205,22 @@ public class TbKafkaNode extends TbAbstractExternalNode {
.build();
}
+ @Override
+ public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
+ boolean hasChanges = false;
+ switch (fromVersion) {
+ case 0 -> {
+ if (oldConfiguration.has("keySerializer") || oldConfiguration.has("valueSerializer")) {
+ ObjectNode objectConfiguration = (ObjectNode) oldConfiguration;
+ objectConfiguration.remove("keySerializer");
+ objectConfiguration.remove("valueSerializer");
+ hasChanges = true;
+ }
+ }
+ default -> {
+ }
+ }
+ return new TbPair<>(hasChanges, oldConfiguration);
+ }
+
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java
index 867dd2e54e..9a31e58ce7 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeConfiguration.java
@@ -16,7 +16,6 @@
package org.thingsboard.rule.engine.kafka;
import lombok.Data;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import java.util.Collections;
@@ -33,8 +32,6 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration otherProperties;
private boolean addMetadataKeyValuesAsKafkaHeaders;
@@ -50,8 +47,6 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
+ return Stream.of(
+ //config for version 0
+ Arguments.of(0,
+ "{\n" +
+ " \"topicPattern\": \"test-topic\",\n" +
+ " \"keyPattern\": \"test-key\",\n" +
+ " \"bootstrapServers\": \"localhost:9092\",\n" +
+ " \"retries\": 0,\n" +
+ " \"batchSize\": 16384,\n" +
+ " \"linger\": 0,\n" +
+ " \"bufferMemory\": 33554432,\n" +
+ " \"acks\": \"-1\",\n" +
+ " \"otherProperties\": {},\n" +
+ " \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" +
+ " \"kafkaHeadersCharset\": \"UTF-8\",\n" +
+ " \"keySerializer\": \"org.apache.kafka.common.serialization.StringSerializer\",\n" +
+ " \"valueSerializer\": \"org.apache.kafka.common.serialization.StringSerializer\"\n" +
+ "}",
+ true,
+ "{\n" +
+ " \"topicPattern\": \"test-topic\",\n" +
+ " \"keyPattern\": \"test-key\",\n" +
+ " \"bootstrapServers\": \"localhost:9092\",\n" +
+ " \"retries\": 0,\n" +
+ " \"batchSize\": 16384,\n" +
+ " \"linger\": 0,\n" +
+ " \"bufferMemory\": 33554432,\n" +
+ " \"acks\": \"-1\",\n" +
+ " \"otherProperties\": {},\n" +
+ " \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" +
+ " \"kafkaHeadersCharset\": \"UTF-8\"\n" +
+ "}"
+ ),
+ //config for version 1 with upgrade from version 0
+ Arguments.of(1,
+ "{\n" +
+ " \"topicPattern\": \"test-topic\",\n" +
+ " \"keyPattern\": \"test-key\",\n" +
+ " \"bootstrapServers\": \"localhost:9092\",\n" +
+ " \"retries\": 0,\n" +
+ " \"batchSize\": 16384,\n" +
+ " \"linger\": 0,\n" +
+ " \"bufferMemory\": 33554432,\n" +
+ " \"acks\": \"-1\",\n" +
+ " \"otherProperties\": {},\n" +
+ " \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" +
+ " \"kafkaHeadersCharset\": \"UTF-8\"\n" +
+ "}",
+ false,
+ "{\n" +
+ " \"topicPattern\": \"test-topic\",\n" +
+ " \"keyPattern\": \"test-key\",\n" +
+ " \"bootstrapServers\": \"localhost:9092\",\n" +
+ " \"retries\": 0,\n" +
+ " \"batchSize\": 16384,\n" +
+ " \"linger\": 0,\n" +
+ " \"bufferMemory\": 33554432,\n" +
+ " \"acks\": \"-1\",\n" +
+ " \"otherProperties\": {},\n" +
+ " \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" +
+ " \"kafkaHeadersCharset\": \"UTF-8\"\n" +
+ "}"
+ )
+ );
+ }
+
+ @Override
+ protected TbNode getTestNode() {
+ return node;
+ }
}
diff --git a/ui-ngx/src/app/modules/home/components/rule-node/external/kafka-config.component.html b/ui-ngx/src/app/modules/home/components/rule-node/external/kafka-config.component.html
index 3aa743247f..1132c9982c 100644
--- a/ui-ngx/src/app/modules/home/components/rule-node/external/kafka-config.component.html
+++ b/ui-ngx/src/app/modules/home/components/rule-node/external/kafka-config.component.html
@@ -73,20 +73,6 @@
-
- rule-node-config.key-serializer
-
-
- {{ 'rule-node-config.key-serializer-required' | translate }}
-
-
-
- rule-node-config.value-serializer
-
-
- {{ 'rule-node-config.value-serializer-required' | translate }}
-
-