diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java index b171f2d8d8..e07be52491 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java @@ -92,10 +92,10 @@ public class DefaultTbQueueRequestTemplate responses = responseTemplate.poll(pollInterval); - if (responses.size() > 0) { - log.trace("Polling responses completed, consumer records count [{}]", responses.size()); - } + final int pendingRequestsCount = pendingRequests.size(); + log.trace("Starting template pool topic {}, for pendingRequests {}", responseTemplate.getTopic(), pendingRequestsCount); + List responses = responseTemplate.poll(pollInterval); //poll js responses + log.trace("Completed template poll topic {}, for pendingRequests [{}], received [{}]", responseTemplate.getTopic(), pendingRequestsCount, responses.size()); responses.forEach(response -> { byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER); UUID requestId; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index fe4300a421..816a732d41 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.util.StopWatch; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; @@ -82,7 +83,16 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue @Override protected List> doPoll(long durationInMillis) { + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + + log.trace("poll topic {} maxDuration {}", getTopic(), durationInMillis); + ConsumerRecords records = consumer.poll(Duration.ofMillis(durationInMillis)); + + stopWatch.stop(); + log.trace("poll topic {} took {}ms", getTopic(), stopWatch.getTotalTimeMillis()); + if (records.isEmpty()) { return Collections.emptyList(); } else {