diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java new file mode 100644 index 0000000000..084d10fca9 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -0,0 +1,136 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Slf4j +public abstract class AbstractTbQueueConsumerTemplate implements TbQueueConsumer { + + private volatile boolean subscribed; + protected volatile Set partitions; + protected final Lock consumerLock = new ReentrantLock(); + + @Getter + private final String topic; + + public AbstractTbQueueConsumerTemplate(String topic) { + this.topic = topic; + } + + @Override + public void subscribe() { + consumerLock.lock(); + try { + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); + subscribed = false; + } finally { + consumerLock.unlock(); + } + } + + @Override + public void subscribe(Set partitions) { + consumerLock.lock(); + try { + this.partitions = partitions; + subscribed = false; + } finally { + consumerLock.unlock(); + } + } + + @Override + public List poll(long durationInMillis) { + if (!subscribed && partitions == null) { + try { + Thread.sleep(durationInMillis); + } catch (InterruptedException e) { + log.debug("Failed to await subscription", e); + } + } else { + consumerLock.lock(); + try { + if (!subscribed) { + doSubscribe(); + subscribed = true; + } + + List records = doPoll(durationInMillis); + if (!records.isEmpty()) { + List result = new ArrayList<>(records.size()); + records.forEach(record -> { + try { + if (record != null) { + result.add(decode(record)); + } + } catch (IOException e) { + log.error("Failed decode record: [{}]", record); + throw new RuntimeException("Failed to decode record: ", e); + } + }); + return result; + } + } finally { + consumerLock.unlock(); + } + } + return Collections.emptyList(); + } + + @Override + public void commit() { + consumerLock.lock(); + try { + doCommit(); + } finally { + consumerLock.unlock(); + } + } + + @Override + public void unsubscribe() { + consumerLock.lock(); + try { + doUnsubscribe(); + } finally { + consumerLock.unlock(); + } + } + + abstract protected List doPoll(long durationInMillis); + + abstract protected T decode(R record) throws IOException; + + abstract protected void doSubscribe(); + + abstract protected void doCommit(); + + abstract protected void doUnsubscribe(); + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 6b1c051eeb..fea31854df 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -16,7 +16,6 @@ package org.thingsboard.server.queue.kafka; import lombok.Builder; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -24,8 +23,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import java.io.IOException; import java.time.Duration; @@ -33,26 +32,17 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; /** * Created by ashvayka on 24.09.18. */ @Slf4j -public class TbKafkaConsumerTemplate implements TbQueueConsumer { +public class TbKafkaConsumerTemplate extends AbstractTbQueueConsumerTemplate, T> { private final TbQueueAdmin admin; private final KafkaConsumer consumer; private final TbKafkaDecoder decoder; - private volatile boolean subscribed; - private volatile Set partitions; - private final Lock consumerLock; - - @Getter - private final String topic; @Builder private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder decoder, @@ -60,6 +50,7 @@ public class TbKafkaConsumerTemplate implements TbQueueCon boolean autoCommit, int autoCommitIntervalMs, int maxPollRecords, TbQueueAdmin admin) { + super(topic); Properties props = settings.toProps(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); if (groupId != null) { @@ -75,94 +66,43 @@ public class TbKafkaConsumerTemplate implements TbQueueCon this.admin = admin; this.consumer = new KafkaConsumer<>(props); this.decoder = decoder; - this.topic = topic; - this.consumerLock = new ReentrantLock(); } @Override - public void subscribe() { - consumerLock.lock(); - try { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; - } finally { - consumerLock.unlock(); - } + protected void doSubscribe() { + List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + topicNames.forEach(admin::createTopicIfNotExists); + consumer.subscribe(topicNames); } @Override - public void subscribe(Set partitions) { - consumerLock.lock(); - try { - this.partitions = partitions; - subscribed = false; - } finally { - consumerLock.unlock(); - } - } - - @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } + protected List> doPoll(long durationInMillis) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(durationInMillis)); + if (records.isEmpty()) { + return Collections.emptyList(); } else { - consumerLock.lock(); - try { - if (!subscribed) { - List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); - topicNames.forEach(admin::createTopicIfNotExists); - consumer.subscribe(topicNames); - subscribed = true; - } - - ConsumerRecords records = consumer.poll(Duration.ofMillis(durationInMillis)); - if (records.count() > 0) { - List result = new ArrayList<>(); - records.forEach(record -> { - try { - result.add(decode(record)); - } catch (IOException e) { - log.error("Failed decode record: [{}]", record); - } - }); - return result; - } - } finally { - consumerLock.unlock(); - } - } - return Collections.emptyList(); - } - - @Override - public void commit() { - consumerLock.lock(); - try { - consumer.commitAsync(); - } finally { - consumerLock.unlock(); + List> recordList = new ArrayList<>(256); + records.forEach(recordList::add); + return recordList; } } @Override - public void unsubscribe() { - consumerLock.lock(); - try { - if (consumer != null) { - consumer.unsubscribe(); - consumer.close(); - } - } finally { - consumerLock.unlock(); - } - } - public T decode(ConsumerRecord record) throws IOException { return decoder.decode(new KafkaTbQueueMsg(record)); } + @Override + protected void doCommit() { + consumer.commitAsync(); + } + + @Override + protected void doUnsubscribe() { + if (consumer != null) { + consumer.unsubscribe(); + consumer.close(); + } + } + }