diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java index a5aa451bd1..99e9c562a7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java @@ -62,9 +62,6 @@ public class TbKafkaProducerTemplate implements TbQueuePro } this.settings = settings; - // Ugly workaround to fix org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class - // details: https://stackoverflow.com/questions/57574901/kafka-java-client-classloader-doesnt-find-sasl-scram-login-class - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); this.producer = new KafkaProducer<>(props); this.defaultTopic = defaultTopic; this.admin = admin; diff --git a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardForkJoinWorkerThreadFactory.java b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardForkJoinWorkerThreadFactory.java index 319421f8bc..a4e56d579a 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardForkJoinWorkerThreadFactory.java +++ b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardForkJoinWorkerThreadFactory.java @@ -34,8 +34,8 @@ public class ThingsBoardForkJoinWorkerThreadFactory implements ForkJoinPool.Fork @Override public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + thread.setContextClassLoader(this.getClass().getClassLoader()); thread.setName(namePrefix +"-"+thread.getPoolIndex()+"-"+threadNumber.getAndIncrement()); return thread; } - } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index df273dc8f8..49f41762ce 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -75,8 +75,8 @@ public class TbKafkaNode implements TbNode { Properties properties = new Properties(); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getKafkaSerializerClass(config.getValueSerializer())); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKafkaSerializerClass(config.getKeySerializer())); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer()); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer()); properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks()); properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries()); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize()); @@ -88,28 +88,12 @@ public class TbKafkaNode implements TbNode { addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false); toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8; try { - // Ugly workaround to fix org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class - // details: https://stackoverflow.com/questions/57574901/kafka-java-client-classloader-doesnt-find-sasl-scram-login-class - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); this.producer = new KafkaProducer<>(properties); } catch (Exception e) { throw new TbNodeException(e); } } - private Class getKafkaSerializerClass(String serializerClassName) { - Class serializerClass = null; - if (!StringUtils.isEmpty(serializerClassName)) { - try { - serializerClass = Class.forName(serializerClassName); - } catch (ClassNotFoundException e) {} - } - if (serializerClass == null) { - serializerClass = StringSerializer.class; - } - return serializerClass; - } - @Override public void onMsg(TbContext ctx, TbMsg msg) { String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);