Refactored
This commit is contained in:
parent
acc9257af7
commit
eae41d112f
@ -55,16 +55,16 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
|
|||||||
|
|
||||||
@Builder
|
@Builder
|
||||||
private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic, String clientId, TbQueueAdmin admin) {
|
private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic, String clientId, TbQueueAdmin admin) {
|
||||||
// Ugly workaround to fix org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class
|
|
||||||
// details: https://stackoverflow.com/questions/57574901/kafka-java-client-classloader-doesnt-find-sasl-scram-login-class
|
|
||||||
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
|
|
||||||
|
|
||||||
Properties props = settings.toProducerProps();
|
Properties props = settings.toProducerProps();
|
||||||
|
|
||||||
if (!StringUtils.isEmpty(clientId)) {
|
if (!StringUtils.isEmpty(clientId)) {
|
||||||
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
|
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
|
||||||
}
|
}
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
|
|
||||||
|
// Ugly workaround to fix org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class
|
||||||
|
// details: https://stackoverflow.com/questions/57574901/kafka-java-client-classloader-doesnt-find-sasl-scram-login-class
|
||||||
|
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
|
||||||
this.producer = new KafkaProducer<>(props);
|
this.producer = new KafkaProducer<>(props);
|
||||||
this.defaultTopic = defaultTopic;
|
this.defaultTopic = defaultTopic;
|
||||||
this.admin = admin;
|
this.admin = admin;
|
||||||
|
|||||||
@ -71,10 +71,6 @@ public class TbKafkaNode implements TbNode {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||||
// Ugly workaround to fix org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class
|
|
||||||
// details: https://stackoverflow.com/questions/57574901/kafka-java-client-classloader-doesnt-find-sasl-scram-login-class
|
|
||||||
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
|
|
||||||
|
|
||||||
this.config = TbNodeUtils.convert(configuration, TbKafkaNodeConfiguration.class);
|
this.config = TbNodeUtils.convert(configuration, TbKafkaNodeConfiguration.class);
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId());
|
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId());
|
||||||
@ -92,6 +88,9 @@ public class TbKafkaNode implements TbNode {
|
|||||||
addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false);
|
addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false);
|
||||||
toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8;
|
toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8;
|
||||||
try {
|
try {
|
||||||
|
// Ugly workaround to fix org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class
|
||||||
|
// details: https://stackoverflow.com/questions/57574901/kafka-java-client-classloader-doesnt-find-sasl-scram-login-class
|
||||||
|
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
|
||||||
this.producer = new KafkaProducer<>(properties);
|
this.producer = new KafkaProducer<>(properties);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new TbNodeException(e);
|
throw new TbNodeException(e);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user