moved properties creation to init() method

This commit is contained in:
IrynaMatveieva 2024-08-07 10:08:48 +03:00
parent 9bcfc75a77
commit 62699900bb
2 changed files with 30 additions and 36 deletions

View File

@ -81,7 +81,26 @@ public class TbKafkaNode extends TbAbstractExternalNode {
super.init(ctx); super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbKafkaNodeConfiguration.class); this.config = TbNodeUtils.convert(configuration, TbKafkaNodeConfiguration.class);
this.initError = null; this.initError = null;
Properties properties = getKafkaProperties(ctx); Properties properties = new Properties();
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getLinger());
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.getBufferMemory());
if (config.getOtherProperties() != null) {
config.getOtherProperties().forEach((k, v) -> {
if (SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG.equals(k)
|| SslConfigs.SSL_KEYSTORE_KEY_CONFIG.equals(k)
|| SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG.equals(k)) {
v = v.replace("\\n", "\n");
}
properties.put(k, v);
});
}
addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false); addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false);
toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8; toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8;
try { try {
@ -98,7 +117,7 @@ public class TbKafkaNode extends TbAbstractExternalNode {
} }
} }
protected KafkaProducer<String, String> getKafkaProducer(Properties properties) { KafkaProducer<String, String> getKafkaProducer(Properties properties) {
return new KafkaProducer<>(properties); return new KafkaProducer<>(properties);
} }
@ -145,30 +164,6 @@ public class TbKafkaNode extends TbAbstractExternalNode {
} }
} }
protected Properties getKafkaProperties(TbContext ctx) {
Properties properties = new Properties();
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getLinger());
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.getBufferMemory());
if (config.getOtherProperties() != null) {
config.getOtherProperties().forEach((k, v) -> {
if (SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG.equals(k)
|| SslConfigs.SSL_KEYSTORE_KEY_CONFIG.equals(k)
|| SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG.equals(k)) {
v = v.replace("\\n", "\n");
}
properties.put(k, v);
});
}
return properties;
}
@Override @Override
public void destroy() { public void destroy() {
if (this.producer != null) { if (this.producer != null) {

View File

@ -82,6 +82,7 @@ public class TbKafkaNodeTest {
private final long OFFSET = 1; private final long OFFSET = 1;
private final int PARTITION = 0; private final int PARTITION = 0;
private final String SERVICE_ID_STR = "test-service-id";
private final String TEST_TOPIC = "test-topic"; private final String TEST_TOPIC = "test-topic";
private final String TEST_KEY = "test-key"; private final String TEST_KEY = "test-key";
@ -144,7 +145,7 @@ public class TbKafkaNodeTest {
} }
@Test @Test
public void verifyGetKafkaPropertiesMethod() throws TbNodeException { public void verifyKafkaProperties() throws TbNodeException {
String sslKeyStoreCertificateChain = "cbdvch\\nfwrg\nvgwg\\n"; String sslKeyStoreCertificateChain = "cbdvch\\nfwrg\nvgwg\\n";
String sslKeyStoreKey = "nghmh\\nhmmnh\\\\ngreg\nvgwg\\n"; String sslKeyStoreKey = "nghmh\\nhmmnh\\\\ngreg\nvgwg\\n";
String sslTruststoreCertificates = "grthrt\fd\\nfwrg\nvgwg\\n"; String sslTruststoreCertificates = "grthrt\fd\\nfwrg\nvgwg\\n";
@ -155,16 +156,12 @@ public class TbKafkaNodeTest {
"ssl.protocol", "TLSv1.2" "ssl.protocol", "TLSv1.2"
)); ));
ReflectionTestUtils.setField(producerMock, "ioThread", ioThreadMock); mockSuccessfulInit();
given(ctxMock.getSelfId()).willReturn(RULE_NODE_ID);
String serviceIdStr = "test-service";
given(ctxMock.getServiceId()).willReturn(serviceIdStr);
willReturn(producerMock).given(node).getKafkaProducer(any());
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
Properties expectedProperties = new Properties(); Properties expectedProperties = new Properties();
expectedProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + RULE_NODE_ID.getId() + "-" + serviceIdStr); expectedProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + RULE_NODE_ID.getId() + "-" + SERVICE_ID_STR);
expectedProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); expectedProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
expectedProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer()); expectedProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
expectedProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer()); expectedProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
@ -178,8 +175,9 @@ public class TbKafkaNodeTest {
expectedProperties.put("ssl.truststore.certificates", sslTruststoreCertificates.replace("\\n", "\n")); expectedProperties.put("ssl.truststore.certificates", sslTruststoreCertificates.replace("\\n", "\n"));
expectedProperties.put("ssl.protocol", "TLSv1.2"); expectedProperties.put("ssl.protocol", "TLSv1.2");
Properties actualsProperties = node.getKafkaProperties(ctxMock); ArgumentCaptor<Properties> properties = ArgumentCaptor.forClass(Properties.class);
assertThat(actualsProperties).isEqualTo(expectedProperties); then(node).should().getKafkaProducer(properties.capture());
assertThat(properties.getValue()).isEqualTo(expectedProperties);
} }
@Test @Test
@ -361,8 +359,9 @@ public class TbKafkaNodeTest {
} }
private void mockSuccessfulInit() { private void mockSuccessfulInit() {
given(ctxMock.getSelfId()).willReturn(RULE_NODE_ID);
given(ctxMock.getServiceId()).willReturn(SERVICE_ID_STR);
ReflectionTestUtils.setField(producerMock, "ioThread", ioThreadMock); ReflectionTestUtils.setField(producerMock, "ioThread", ioThreadMock);
willReturn(mock(Properties.class)).given(node).getKafkaProperties(ctxMock);
willReturn(producerMock).given(node).getKafkaProducer(any()); willReturn(producerMock).given(node).getKafkaProducer(any());
} }