Merge pull request #12774 from YevhenBondarenko/fix/prod-5664
[Kafka node] Removed key/value serializers from config
This commit is contained in:
commit
e74e83b0ba
@ -15,6 +15,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.rule.engine.kafka;
|
package org.thingsboard.rule.engine.kafka;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.BooleanUtils;
|
import org.apache.commons.lang3.BooleanUtils;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
@ -26,6 +28,7 @@ import org.apache.kafka.common.config.SslConfigs;
|
|||||||
import org.apache.kafka.common.header.Headers;
|
import org.apache.kafka.common.header.Headers;
|
||||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||||
import org.apache.kafka.common.header.internals.RecordHeaders;
|
import org.apache.kafka.common.header.internals.RecordHeaders;
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
import org.springframework.util.ReflectionUtils;
|
import org.springframework.util.ReflectionUtils;
|
||||||
import org.thingsboard.rule.engine.api.RuleNode;
|
import org.thingsboard.rule.engine.api.RuleNode;
|
||||||
import org.thingsboard.rule.engine.api.TbContext;
|
import org.thingsboard.rule.engine.api.TbContext;
|
||||||
@ -35,6 +38,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
|||||||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
|
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
|
||||||
import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError;
|
import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError;
|
||||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||||
|
import org.thingsboard.server.common.data.util.TbPair;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
|
|
||||||
@ -48,6 +52,7 @@ import java.util.Properties;
|
|||||||
type = ComponentType.EXTERNAL,
|
type = ComponentType.EXTERNAL,
|
||||||
name = "kafka",
|
name = "kafka",
|
||||||
configClazz = TbKafkaNodeConfiguration.class,
|
configClazz = TbKafkaNodeConfiguration.class,
|
||||||
|
version = 1,
|
||||||
nodeDescription = "Publish messages to Kafka server",
|
nodeDescription = "Publish messages to Kafka server",
|
||||||
nodeDetails = "Will send record via Kafka producer to Kafka server. " +
|
nodeDetails = "Will send record via Kafka producer to Kafka server. " +
|
||||||
"Outbound message will contain response fields (<code>offset</code>, <code>partition</code>, <code>topic</code>)" +
|
"Outbound message will contain response fields (<code>offset</code>, <code>partition</code>, <code>topic</code>)" +
|
||||||
@ -83,8 +88,8 @@ public class TbKafkaNode extends TbAbstractExternalNode {
|
|||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId());
|
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId());
|
||||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
|
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
|
||||||
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
|
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
|
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
|
properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
|
||||||
properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
|
properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
|
||||||
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
|
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
|
||||||
@ -200,4 +205,22 @@ public class TbKafkaNode extends TbAbstractExternalNode {
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
|
||||||
|
boolean hasChanges = false;
|
||||||
|
switch (fromVersion) {
|
||||||
|
case 0 -> {
|
||||||
|
if (oldConfiguration.has("keySerializer") || oldConfiguration.has("valueSerializer")) {
|
||||||
|
ObjectNode objectConfiguration = (ObjectNode) oldConfiguration;
|
||||||
|
objectConfiguration.remove("keySerializer");
|
||||||
|
objectConfiguration.remove("valueSerializer");
|
||||||
|
hasChanges = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default -> {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new TbPair<>(hasChanges, oldConfiguration);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,7 +16,6 @@
|
|||||||
package org.thingsboard.rule.engine.kafka;
|
package org.thingsboard.rule.engine.kafka;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
|
||||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -33,8 +32,6 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration<TbKafkaNodeCo
|
|||||||
private int linger;
|
private int linger;
|
||||||
private int bufferMemory;
|
private int bufferMemory;
|
||||||
private String acks;
|
private String acks;
|
||||||
private String keySerializer;
|
|
||||||
private String valueSerializer;
|
|
||||||
private Map<String, String> otherProperties;
|
private Map<String, String> otherProperties;
|
||||||
|
|
||||||
private boolean addMetadataKeyValuesAsKafkaHeaders;
|
private boolean addMetadataKeyValuesAsKafkaHeaders;
|
||||||
@ -50,8 +47,6 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration<TbKafkaNodeCo
|
|||||||
configuration.setLinger(0);
|
configuration.setLinger(0);
|
||||||
configuration.setBufferMemory(33554432);
|
configuration.setBufferMemory(33554432);
|
||||||
configuration.setAcks("-1");
|
configuration.setAcks("-1");
|
||||||
configuration.setKeySerializer(StringSerializer.class.getName());
|
|
||||||
configuration.setValueSerializer(StringSerializer.class.getName());
|
|
||||||
configuration.setOtherProperties(Collections.emptyMap());
|
configuration.setOtherProperties(Collections.emptyMap());
|
||||||
configuration.setAddMetadataKeyValuesAsKafkaHeaders(false);
|
configuration.setAddMetadataKeyValuesAsKafkaHeaders(false);
|
||||||
configuration.setKafkaHeadersCharset("UTF-8");
|
configuration.setKafkaHeadersCharset("UTF-8");
|
||||||
|
|||||||
@ -39,8 +39,10 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
|||||||
import org.springframework.test.util.ReflectionTestUtils;
|
import org.springframework.test.util.ReflectionTestUtils;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.common.util.ListeningExecutor;
|
import org.thingsboard.common.util.ListeningExecutor;
|
||||||
|
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
|
||||||
import org.thingsboard.rule.engine.TestDbCallbackExecutor;
|
import org.thingsboard.rule.engine.TestDbCallbackExecutor;
|
||||||
import org.thingsboard.rule.engine.api.TbContext;
|
import org.thingsboard.rule.engine.api.TbContext;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNode;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||||
@ -73,7 +75,7 @@ import static org.mockito.BDDMockito.willReturn;
|
|||||||
import static org.mockito.BDDMockito.willThrow;
|
import static org.mockito.BDDMockito.willThrow;
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
public class TbKafkaNodeTest {
|
public class TbKafkaNodeTest extends AbstractRuleNodeUpgradeTest {
|
||||||
|
|
||||||
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("5f2eac08-bd1f-4635-a6c2-437369f996cf"));
|
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("5f2eac08-bd1f-4635-a6c2-437369f996cf"));
|
||||||
private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("d46bb666-ecab-4d89-a28f-5abdca23ac29"));
|
private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("d46bb666-ecab-4d89-a28f-5abdca23ac29"));
|
||||||
@ -117,8 +119,6 @@ public class TbKafkaNodeTest {
|
|||||||
assertThat(config.getLinger()).isEqualTo(0);
|
assertThat(config.getLinger()).isEqualTo(0);
|
||||||
assertThat(config.getBufferMemory()).isEqualTo(33554432);
|
assertThat(config.getBufferMemory()).isEqualTo(33554432);
|
||||||
assertThat(config.getAcks()).isEqualTo("-1");
|
assertThat(config.getAcks()).isEqualTo("-1");
|
||||||
assertThat(config.getKeySerializer()).isEqualTo(StringSerializer.class.getName());
|
|
||||||
assertThat(config.getValueSerializer()).isEqualTo(StringSerializer.class.getName());
|
|
||||||
assertThat(config.getOtherProperties()).isEmpty();
|
assertThat(config.getOtherProperties()).isEmpty();
|
||||||
assertThat(config.isAddMetadataKeyValuesAsKafkaHeaders()).isFalse();
|
assertThat(config.isAddMetadataKeyValuesAsKafkaHeaders()).isFalse();
|
||||||
assertThat(config.getKafkaHeadersCharset()).isEqualTo("UTF-8");
|
assertThat(config.getKafkaHeadersCharset()).isEqualTo("UTF-8");
|
||||||
@ -163,8 +163,8 @@ public class TbKafkaNodeTest {
|
|||||||
Properties expectedProperties = new Properties();
|
Properties expectedProperties = new Properties();
|
||||||
expectedProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + RULE_NODE_ID.getId() + "-" + SERVICE_ID_STR);
|
expectedProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + RULE_NODE_ID.getId() + "-" + SERVICE_ID_STR);
|
||||||
expectedProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
|
expectedProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
|
||||||
expectedProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
|
expectedProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
expectedProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
|
expectedProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
expectedProperties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
|
expectedProperties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
|
||||||
expectedProperties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
|
expectedProperties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
|
||||||
expectedProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
|
expectedProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
|
||||||
@ -454,4 +454,75 @@ public class TbKafkaNodeTest {
|
|||||||
assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
|
assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
|
||||||
|
return Stream.of(
|
||||||
|
//config for version 0
|
||||||
|
Arguments.of(0,
|
||||||
|
"{\n" +
|
||||||
|
" \"topicPattern\": \"test-topic\",\n" +
|
||||||
|
" \"keyPattern\": \"test-key\",\n" +
|
||||||
|
" \"bootstrapServers\": \"localhost:9092\",\n" +
|
||||||
|
" \"retries\": 0,\n" +
|
||||||
|
" \"batchSize\": 16384,\n" +
|
||||||
|
" \"linger\": 0,\n" +
|
||||||
|
" \"bufferMemory\": 33554432,\n" +
|
||||||
|
" \"acks\": \"-1\",\n" +
|
||||||
|
" \"otherProperties\": {},\n" +
|
||||||
|
" \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" +
|
||||||
|
" \"kafkaHeadersCharset\": \"UTF-8\",\n" +
|
||||||
|
" \"keySerializer\": \"org.apache.kafka.common.serialization.StringSerializer\",\n" +
|
||||||
|
" \"valueSerializer\": \"org.apache.kafka.common.serialization.StringSerializer\"\n" +
|
||||||
|
"}",
|
||||||
|
true,
|
||||||
|
"{\n" +
|
||||||
|
" \"topicPattern\": \"test-topic\",\n" +
|
||||||
|
" \"keyPattern\": \"test-key\",\n" +
|
||||||
|
" \"bootstrapServers\": \"localhost:9092\",\n" +
|
||||||
|
" \"retries\": 0,\n" +
|
||||||
|
" \"batchSize\": 16384,\n" +
|
||||||
|
" \"linger\": 0,\n" +
|
||||||
|
" \"bufferMemory\": 33554432,\n" +
|
||||||
|
" \"acks\": \"-1\",\n" +
|
||||||
|
" \"otherProperties\": {},\n" +
|
||||||
|
" \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" +
|
||||||
|
" \"kafkaHeadersCharset\": \"UTF-8\"\n" +
|
||||||
|
"}"
|
||||||
|
),
|
||||||
|
//config for version 1 with upgrade from version 0
|
||||||
|
Arguments.of(1,
|
||||||
|
"{\n" +
|
||||||
|
" \"topicPattern\": \"test-topic\",\n" +
|
||||||
|
" \"keyPattern\": \"test-key\",\n" +
|
||||||
|
" \"bootstrapServers\": \"localhost:9092\",\n" +
|
||||||
|
" \"retries\": 0,\n" +
|
||||||
|
" \"batchSize\": 16384,\n" +
|
||||||
|
" \"linger\": 0,\n" +
|
||||||
|
" \"bufferMemory\": 33554432,\n" +
|
||||||
|
" \"acks\": \"-1\",\n" +
|
||||||
|
" \"otherProperties\": {},\n" +
|
||||||
|
" \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" +
|
||||||
|
" \"kafkaHeadersCharset\": \"UTF-8\"\n" +
|
||||||
|
"}",
|
||||||
|
false,
|
||||||
|
"{\n" +
|
||||||
|
" \"topicPattern\": \"test-topic\",\n" +
|
||||||
|
" \"keyPattern\": \"test-key\",\n" +
|
||||||
|
" \"bootstrapServers\": \"localhost:9092\",\n" +
|
||||||
|
" \"retries\": 0,\n" +
|
||||||
|
" \"batchSize\": 16384,\n" +
|
||||||
|
" \"linger\": 0,\n" +
|
||||||
|
" \"bufferMemory\": 33554432,\n" +
|
||||||
|
" \"acks\": \"-1\",\n" +
|
||||||
|
" \"otherProperties\": {},\n" +
|
||||||
|
" \"addMetadataKeyValuesAsKafkaHeaders\": false,\n" +
|
||||||
|
" \"kafkaHeadersCharset\": \"UTF-8\"\n" +
|
||||||
|
"}"
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TbNode getTestNode() {
|
||||||
|
return node;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -73,20 +73,6 @@
|
|||||||
</mat-option>
|
</mat-option>
|
||||||
</mat-select>
|
</mat-select>
|
||||||
</mat-form-field>
|
</mat-form-field>
|
||||||
<mat-form-field class="mat-block">
|
|
||||||
<mat-label translate>rule-node-config.key-serializer</mat-label>
|
|
||||||
<input required matInput formControlName="keySerializer">
|
|
||||||
<mat-error *ngIf="kafkaConfigForm.get('keySerializer').hasError('required')">
|
|
||||||
{{ 'rule-node-config.key-serializer-required' | translate }}
|
|
||||||
</mat-error>
|
|
||||||
</mat-form-field>
|
|
||||||
<mat-form-field class="mat-block">
|
|
||||||
<mat-label translate>rule-node-config.value-serializer</mat-label>
|
|
||||||
<input required matInput formControlName="valueSerializer">
|
|
||||||
<mat-error *ngIf="kafkaConfigForm.get('valueSerializer').hasError('required')">
|
|
||||||
{{ 'rule-node-config.value-serializer-required' | translate }}
|
|
||||||
</mat-error>
|
|
||||||
</mat-form-field>
|
|
||||||
<label translate class="tb-title">rule-node-config.other-properties</label>
|
<label translate class="tb-title">rule-node-config.other-properties</label>
|
||||||
<tb-kv-map-config-old
|
<tb-kv-map-config-old
|
||||||
required="false"
|
required="false"
|
||||||
|
|||||||
@ -54,8 +54,6 @@ export class KafkaConfigComponent extends RuleNodeConfigurationComponent {
|
|||||||
linger: [configuration ? configuration.linger : null, [Validators.min(0)]],
|
linger: [configuration ? configuration.linger : null, [Validators.min(0)]],
|
||||||
bufferMemory: [configuration ? configuration.bufferMemory : null, [Validators.min(0)]],
|
bufferMemory: [configuration ? configuration.bufferMemory : null, [Validators.min(0)]],
|
||||||
acks: [configuration ? configuration.acks : null, [Validators.required]],
|
acks: [configuration ? configuration.acks : null, [Validators.required]],
|
||||||
keySerializer: [configuration ? configuration.keySerializer : null, [Validators.required]],
|
|
||||||
valueSerializer: [configuration ? configuration.valueSerializer : null, [Validators.required]],
|
|
||||||
otherProperties: [configuration ? configuration.otherProperties : null, []],
|
otherProperties: [configuration ? configuration.otherProperties : null, []],
|
||||||
addMetadataKeyValuesAsKafkaHeaders: [configuration ? configuration.addMetadataKeyValuesAsKafkaHeaders : false, []],
|
addMetadataKeyValuesAsKafkaHeaders: [configuration ? configuration.addMetadataKeyValuesAsKafkaHeaders : false, []],
|
||||||
kafkaHeadersCharset: [configuration ? configuration.kafkaHeadersCharset : null, []]
|
kafkaHeadersCharset: [configuration ? configuration.kafkaHeadersCharset : null, []]
|
||||||
|
|||||||
@ -4838,10 +4838,6 @@
|
|||||||
"min-buffer-memory-message": "Only 0 minimum buffer size is allowed.",
|
"min-buffer-memory-message": "Only 0 minimum buffer size is allowed.",
|
||||||
"memory-buffer-size-range": "Memory buffer size must be between 0 and {{max}} KB",
|
"memory-buffer-size-range": "Memory buffer size must be between 0 and {{max}} KB",
|
||||||
"acks": "Number of acknowledgments",
|
"acks": "Number of acknowledgments",
|
||||||
"key-serializer": "Key serializer",
|
|
||||||
"key-serializer-required": "Key serializer is required",
|
|
||||||
"value-serializer": "Value serializer",
|
|
||||||
"value-serializer-required": "Value serializer is required",
|
|
||||||
"topic-arn-pattern": "Topic ARN pattern",
|
"topic-arn-pattern": "Topic ARN pattern",
|
||||||
"topic-arn-pattern-required": "Topic ARN pattern is required",
|
"topic-arn-pattern-required": "Topic ARN pattern is required",
|
||||||
"aws-access-key-id": "AWS Access Key ID",
|
"aws-access-key-id": "AWS Access Key ID",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user