diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index e1a45e74d8..8d2cf4b466 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -26,6 +26,7 @@ kafka: servers: "TB_KAFKA_SERVERS" replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR" acks: "TB_KAFKA_ACKS" # -1 = all; 0 = no acknowledgments; 1 = only waits for the leader to acknowledge + requestTimeout: "TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS" topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD" client_id: "KAFKA_CLIENT_ID" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index 8983b68368..84e927378a 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -25,6 +25,8 @@ kafka: # Kafka Bootstrap Servers servers: "localhost:9092" replication_factor: "1" + acks: "1" # -1 = all; 0 = no acknowledgments; 1 = only waits for the leader to acknowledge + requestTimeout: "30000" # The default value in kafkajs is: 30000 topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1" use_confluent_cloud: false client_id: "kafkajs" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js index deee3b95b9..fb56630471 100644 --- a/msa/js-executor/queue/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -19,10 +19,11 @@ const config = require('config'), JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), logger = require('../config/logger')._logger('kafkaTemplate'), KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator; -const replicationFactor = config.get('kafka.replication_factor'); +const replicationFactor = Number(config.get('kafka.replication_factor')); const topicProperties = config.get('kafka.topic_properties'); const kafkaClientId = config.get('kafka.client_id'); -const acks = config.get('kafka.acks'); +const acks = Number(config.get('kafka.acks')); +const requestTimeout = Number(config.get('kafka.requestTimeout')); let kafkaClient; let kafkaAdmin; @@ -72,6 +73,8 @@ function KafkaProducer() { logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); } + kafkaConfig['requestTimeout'] = requestTimeout; + if (useConfluent) { kafkaConfig['sasl'] = { mechanism: config.get('kafka.confluent.sasl.mechanism'), @@ -117,6 +120,7 @@ function KafkaProducer() { logger.info('Started ThingsBoard JavaScript Executor Microservice.'); await consumer.run({ + //partitionsConsumedConcurrently: 1, // Default: 1 eachMessage: async ({topic, partition, message}) => { let headers = message.headers; let key = message.key;