diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 696c314379..814dc268c1 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -748,6 +748,8 @@ queue: compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" linger.ms: "${TB_KAFKA_LINGER_MS:1}" + max.request.size: "${TB_KAFKA_MAX_REQUEST_SIZE:1048576}" + max.in.flight.requests.per.connection: "${TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:5}" buffer.memory: "${TB_BUFFER_MEMORY:33554432}" replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 708a6c6168..a66cbbd232 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -63,6 +63,12 @@ public class TbKafkaSettings { @Value("${queue.kafka.linger.ms}") private long lingerMs; + @Value("${queue.kafka.max.request.size}") + private int maxRequestSize; + + @Value("${queue.kafka.max.in.flight.requests.per.connection:5}") + private int maxInFlightRequestsPerConnection; + @Value("${queue.kafka.buffer.memory}") private long bufferMemory; @@ -139,6 +145,8 @@ public class TbKafkaSettings { props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType); + props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection); return props; }