kafka producer - call callback.onFailure in case of producer.send exception (like InterruptedException when buffer.memory and max.block.ms reached)

This commit is contained in:
Sergey Matvienko 2021-07-26 14:01:58 +03:00 committed by Andrew Shvayka
parent 2d4831af39
commit 2a4a9187d1

View File

@ -77,25 +77,34 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
@Override @Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
createTopicIfNotExist(tpi); try {
String key = msg.getKey().toString(); createTopicIfNotExist(tpi);
byte[] data = msg.getData(); String key = msg.getKey().toString();
ProducerRecord<String, byte[]> record; byte[] data = msg.getData();
Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); ProducerRecord<String, byte[]> record;
record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers); Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
producer.send(record, (metadata, exception) -> { record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers);
if (exception == null) { producer.send(record, (metadata, exception) -> {
if (callback != null) { if (exception == null) {
callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata)); if (callback != null) {
} callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata));
} else { }
if (callback != null) {
callback.onFailure(exception);
} else { } else {
log.warn("Producer template failure: {}", exception.getMessage(), exception); if (callback != null) {
callback.onFailure(exception);
} else {
log.warn("Producer template failure: {}", exception.getMessage(), exception);
}
} }
});
} catch (Exception e) {
if (callback != null) {
callback.onFailure(e);
} else {
log.warn("Producer template failure (send method wrapper): {}", e.getMessage(), e);
} }
}); throw e;
}
} }
private void createTopicIfNotExist(TopicPartitionInfo tpi) { private void createTopicIfNotExist(TopicPartitionInfo tpi) {