added logs and timing metrics for DefaultTbQueueRequestTemplate, TbKafkaConsumerTemplate (todo revert or refactor)

This commit is contained in:
Sergey Matvienko 2021-04-27 18:26:52 +03:00
parent 2a5ba8e8ad
commit a04eac6015
2 changed files with 14 additions and 4 deletions

View File

@ -92,10 +92,10 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
long nextCleanupMs = 0L; long nextCleanupMs = 0L;
while (!stopped) { while (!stopped) {
try { try {
List<Response> responses = responseTemplate.poll(pollInterval); final int pendingRequestsCount = pendingRequests.size();
if (responses.size() > 0) { log.trace("Starting template pool topic {}, for pendingRequests {}", responseTemplate.getTopic(), pendingRequestsCount);
log.trace("Polling responses completed, consumer records count [{}]", responses.size()); List<Response> responses = responseTemplate.poll(pollInterval); //poll js responses
} log.trace("Completed template poll topic {}, for pendingRequests [{}], received [{}]", responseTemplate.getTopic(), pendingRequestsCount, responses.size());
responses.forEach(response -> { responses.forEach(response -> {
byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER); byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);
UUID requestId; UUID requestId;

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.util.StopWatch;
import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
@ -82,7 +83,16 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
@Override @Override
protected List<ConsumerRecord<String, byte[]>> doPoll(long durationInMillis) { protected List<ConsumerRecord<String, byte[]>> doPoll(long durationInMillis) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.trace("poll topic {} maxDuration {}", getTopic(), durationInMillis);
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis)); ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));
stopWatch.stop();
log.trace("poll topic {} took {}ms", getTopic(), stopWatch.getTotalTimeMillis());
if (records.isEmpty()) { if (records.isEmpty()) {
return Collections.emptyList(); return Collections.emptyList();
} else { } else {