remote js-executor: parameter added TB_KAFKA_ACKS="1"

This commit is contained in:
Sergey Matvienko 2021-04-21 19:51:59 +03:00 committed by Andrew Shvayka
parent a569058c29
commit c94dc5972e
3 changed files with 5 additions and 2 deletions

View File

@ -88,7 +88,7 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) {
logger.error(err.stack); logger.error(err.stack);
} }
} }
logger.warn('[%s] SLOW PROCESSING [%s]ms, functionName [%s], request: ', requestId, tTook, functionName); logger.warn('[%s] SLOW PROCESSING [%s]ms, functionName [%s]', requestId, tTook, functionName);
if (slowQueryLogBody) { if (slowQueryLogBody) {
logger.info('Slow request body: %s', JSON.stringify(request, null, 4)) logger.info('Slow request body: %s', JSON.stringify(request, null, 4))
} }

View File

@ -25,6 +25,7 @@ kafka:
# Kafka Bootstrap Servers # Kafka Bootstrap Servers
servers: "TB_KAFKA_SERVERS" servers: "TB_KAFKA_SERVERS"
replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR" replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR"
acks: "TB_KAFKA_ACKS" # -1 = all; 0 = no acknowledgments; 1 = only waits for the leader to acknowledge
topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES"
use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD" use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD"
client_id: "KAFKA_CLIENT_ID" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh client_id: "KAFKA_CLIENT_ID" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
const {logLevel, Kafka} = require('kafkajs'); const {logLevel, Kafka, CompressionTypes} = require('kafkajs');
const config = require('config'), const config = require('config'),
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
@ -22,6 +22,7 @@ const config = require('config'),
const replicationFactor = config.get('kafka.replication_factor'); const replicationFactor = config.get('kafka.replication_factor');
const topicProperties = config.get('kafka.topic_properties'); const topicProperties = config.get('kafka.topic_properties');
const kafkaClientId = config.get('kafka.client_id'); const kafkaClientId = config.get('kafka.client_id');
const acks = config.get('kafka.acks');
let kafkaClient; let kafkaClient;
let kafkaAdmin; let kafkaAdmin;
@ -35,6 +36,7 @@ function KafkaProducer() {
return producer.send( return producer.send(
{ {
topic: responseTopic, topic: responseTopic,
acks: acks,
messages: [ messages: [
{ {
key: scriptId, key: scriptId,