From c3e9ab59918f04c5eb9d79df47948c187d09e7cf Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 26 Jul 2023 20:38:22 +0200 Subject: [PATCH 1/2] TbKafkaProducerTemplate will add headers for each message when log level: DEBUG - producerId and thread name; TRACE - stacktrace first 10-2=8 lines --- .../queue/kafka/TbKafkaProducerTemplate.java | 28 +++++++++- .../kafka/TbKafkaProducerTemplateTest.java | 54 +++++++++++++++++++ .../queue/src/test/resources/logback-test.xml | 20 +++++++ 3 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java create mode 100644 common/queue/src/test/resources/logback-test.xml diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java index 15c2f04d17..7c1c28b9f5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java @@ -30,6 +30,9 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueProducer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -53,10 +56,14 @@ public class TbKafkaProducerTemplate implements TbQueuePro private final Set topics; + @Getter + private final String clientId; + @Builder private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic, String clientId, TbQueueAdmin admin) { Properties props = settings.toProducerProps(); + this.clientId = Objects.requireNonNull(clientId, "Kafka producer client.id is null"); if (!StringUtils.isEmpty(clientId)) { props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); } @@ -72,6 +79,24 @@ public class TbKafkaProducerTemplate implements TbQueuePro public void init() { } + void addAnalyticHeaders(List
headers) { + try { + if (log.isDebugEnabled()) { + headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8))); + } + if (log.isTraceEnabled()) { + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + int maxlevel = Math.min(stackTrace.length, 10); + for (int i = 2; i < maxlevel; i++) { // ignore two levels: getStackTrace and addAnalyticHeaders + headers.add(new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8))); + } + } + } catch (Throwable t) { + log.debug("Failed to add analytic header in Kafka producer {}", getClientId(), t); + } + } + @Override public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { try { @@ -79,7 +104,8 @@ public class TbKafkaProducerTemplate implements TbQueuePro String key = msg.getKey().toString(); byte[] data = msg.getData(); ProducerRecord record; - Iterable
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); + List
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); + addAnalyticHeaders(headers); record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers); producer.send(record, (metadata, exception) -> { if (exception == null) { diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java new file mode 100644 index 0000000000..bfd3c4a6dc --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplateTest.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.header.Header; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.willCallRealMethod; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; + +@Slf4j +class TbKafkaProducerTemplateTest { + + TbKafkaProducerTemplate producerTemplate; + + @BeforeEach + void setUp() { + producerTemplate = mock(TbKafkaProducerTemplate.class); + willCallRealMethod().given(producerTemplate).addAnalyticHeaders(any()); + willReturn("tb-core-to-core-notifications-tb-core-3").given(producerTemplate).getClientId(); + } + + @Test + void testAddAnalyticHeaders() { + List
headers = new ArrayList<>(); + producerTemplate.addAnalyticHeaders(headers); + assertThat(headers).isNotEmpty(); + headers.forEach(r -> log.info("RecordHeader key [{}] value [{}]", r.key(), new String(r.value(), StandardCharsets.UTF_8))); + } + +} diff --git a/common/queue/src/test/resources/logback-test.xml b/common/queue/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..f7053313d4 --- /dev/null +++ b/common/queue/src/test/resources/logback-test.xml @@ -0,0 +1,20 @@ + + + + + + %d{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + From 2cccd0951a491c8a63264a6982d2e3744e78aaeb Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 19:40:27 +0200 Subject: [PATCH 2/2] TbKafkaProducerTemplate: addAnalyticHeaders optimized --- .../queue/kafka/TbKafkaProducerTemplate.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java index 7c1c28b9f5..850f5c08ea 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java @@ -80,20 +80,18 @@ public class TbKafkaProducerTemplate implements TbQueuePro } void addAnalyticHeaders(List
headers) { - try { - if (log.isDebugEnabled()) { - headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8))); - headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8))); - } - if (log.isTraceEnabled()) { + headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8))); + if (log.isTraceEnabled()) { + try { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); int maxlevel = Math.min(stackTrace.length, 10); for (int i = 2; i < maxlevel; i++) { // ignore two levels: getStackTrace and addAnalyticHeaders headers.add(new RecordHeader("_stackTrace" + i, stackTrace[i].toString().getBytes(StandardCharsets.UTF_8))); } + } catch (Throwable t) { + log.trace("Failed to add stacktrace headers in Kafka producer {}", getClientId(), t); } - } catch (Throwable t) { - log.debug("Failed to add analytic header in Kafka producer {}", getClientId(), t); } } @@ -105,7 +103,9 @@ public class TbKafkaProducerTemplate implements TbQueuePro byte[] data = msg.getData(); ProducerRecord record; List
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); - addAnalyticHeaders(headers); + if (log.isDebugEnabled()) { + addAnalyticHeaders(headers); + } record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers); producer.send(record, (metadata, exception) -> { if (exception == null) {