TbKafkaProducerTemplate: addAnalyticHeaders optimized

This commit is contained in:
Sergey Matvienko 2023-07-27 19:40:27 +02:00
parent c3e9ab5991
commit 2cccd0951a

View File

@ -80,20 +80,18 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
} }
void addAnalyticHeaders(List<Header> headers) { void addAnalyticHeaders(List<Header> headers) {
try { headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8)));
if (log.isDebugEnabled()) { headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8))); if (log.isTraceEnabled()) {
headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8))); try {
}
if (log.isTraceEnabled()) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
int maxlevel = Math.min(stackTrace.length, 10); int maxlevel = Math.min(stackTrace.length, 10);
for (int i = 2; i < maxlevel; i++) { // ignore two levels: getStackTrace and addAnalyticHeaders for (int i = 2; i < maxlevel; i++) { // ignore two levels: getStackTrace and addAnalyticHeaders
headers.add(new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8))); headers.add(new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8)));
} }
} catch (Throwable t) {
log.trace("Failed to add stacktrace headers in Kafka producer {}", getClientId(), t);
} }
} catch (Throwable t) {
log.debug("Failed to add analytic header in Kafka producer {}", getClientId(), t);
} }
} }
@ -105,7 +103,9 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
byte[] data = msg.getData(); byte[] data = msg.getData();
ProducerRecord<String, byte[]> record; ProducerRecord<String, byte[]> record;
List<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); List<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
addAnalyticHeaders(headers); if (log.isDebugEnabled()) {
addAnalyticHeaders(headers);
}
record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers); record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers);
producer.send(record, (metadata, exception) -> { producer.send(record, (metadata, exception) -> {
if (exception == null) { if (exception == null) {