[2.5]fix ConcurrentModificationException (#2560)

* fix ConcurrentModificationException

* kafka consumer improvements

* kafka consumer improvements

* refactored kafka consumer

* refactored kafka consumer
This commit is contained in:
Yevhen Bondarenko 2020-03-31 16:47:43 +03:00 committed by GitHub
parent a6e090ef86
commit b39328c989
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -33,6 +33,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
@ -46,6 +48,7 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
private final TbKafkaDecoder<T> decoder;
private volatile boolean subscribed;
private volatile Set<TopicPartitionInfo> partitions;
private final Lock consumerLock;
@Getter
private final String topic;
@ -71,6 +74,7 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
this.consumer = new KafkaConsumer<>(props);
this.decoder = decoder;
this.topic = topic;
this.consumerLock = new ReentrantLock();
}
@Override
@ -94,23 +98,30 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
log.debug("Failed to await subscription", e);
}
} else {
if (!subscribed) {
List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
topicNames.forEach(admin::createTopicIfNotExists);
consumer.subscribe(topicNames);
subscribed = true;
}
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));
if (records.count() > 0) {
List<T> result = new ArrayList<>();
records.forEach(record -> {
try {
result.add(decode(record));
} catch (IOException e) {
log.error("Failed decode record: [{}]", record);
}
});
return result;
try {
consumerLock.lock();
if (!subscribed) {
List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
topicNames.forEach(admin::createTopicIfNotExists);
consumer.subscribe(topicNames);
subscribed = true;
}
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));
if (records.count() > 0) {
List<T> result = new ArrayList<>();
records.forEach(record -> {
try {
result.add(decode(record));
} catch (IOException e) {
log.error("Failed decode record: [{}]", record);
}
});
return result;
}
} finally {
consumerLock.unlock();
}
}
return Collections.emptyList();
@ -118,12 +129,25 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
@Override
public void commit() {
consumer.commitAsync();
try {
consumerLock.lock();
consumer.commitAsync();
} finally {
consumerLock.unlock();
}
}
@Override
public void unsubscribe() {
consumer.unsubscribe();
try {
consumerLock.lock();
if (consumer != null) {
consumer.unsubscribe();
consumer.close();
}
} finally {
consumerLock.unlock();
}
}
public T decode(ConsumerRecord<String, byte[]> record) throws IOException {