diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java index 6bd2a69b2b..43f7321120 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -21,22 +21,28 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; +import javax.annotation.Nonnull; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.locks.Lock; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import static java.util.Collections.emptyList; + @Slf4j public abstract class AbstractTbQueueConsumerTemplate implements TbQueueConsumer { private volatile boolean subscribed; protected volatile boolean stopped = false; protected volatile Set partitions; - protected final Lock consumerLock = new ReentrantLock(); + protected final ReentrantLock consumerLock = new ReentrantLock(); //NonfairSync + final Queue> subscribeQueue = new ConcurrentLinkedQueue<>(); @Getter private final String topic; @@ -47,84 +53,101 @@ public abstract class AbstractTbQueueConsumerTemplate i @Override public void subscribe() { - consumerLock.lock(); - try { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; - } finally { - consumerLock.unlock(); + log.info("enqueue topic subscribe {} ", topic); + if (stopped) { + log.error("trying subscribe, but consumer stopped for topic {}", topic); + return; } + subscribeQueue.add(Collections.singleton(new TopicPartitionInfo(topic, null, null, true))); } @Override public void subscribe(Set partitions) { - consumerLock.lock(); - try { - this.partitions = partitions; - subscribed = false; - } finally { - consumerLock.unlock(); + log.info("enqueue topics subscribe {} ", partitions); + if (stopped) { + log.error("trying subscribe, but consumer stopped for topic {}", topic); + return; } + subscribeQueue.add(partitions); } @Override public List poll(long durationInMillis) { + List records; + long startNanos = System.nanoTime(); + if (stopped) { + return errorAndReturnEmpty(); + } if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - long pollStartTs = System.currentTimeMillis(); - consumerLock.lock(); - try { - if (!subscribed) { - List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); - doSubscribe(topicNames); - subscribed = true; - } + return sleepAndReturnEmpty(startNanos, durationInMillis); + } - List records; - if (partitions.isEmpty()) { - records = Collections.emptyList(); - } else { - records = doPoll(durationInMillis); + if (consumerLock.isLocked()) { + log.error("poll. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock", new RuntimeException("stacktrace")); + } + + consumerLock.lock(); + try { + while (!subscribeQueue.isEmpty()) { + subscribed = false; + partitions = subscribeQueue.poll(); + } + if (!subscribed) { + List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + doSubscribe(topicNames); + subscribed = true; + } + records = partitions.isEmpty() ? emptyList() : doPoll(durationInMillis); + } finally { + consumerLock.unlock(); + } + + if (records.isEmpty()) { return sleepAndReturnEmpty(startNanos, durationInMillis); } + + return decodeRecords(records); + } + + @Nonnull + List decodeRecords(@Nonnull List records) { + List result = new ArrayList<>(records.size()); + records.forEach(record -> { + try { + if (record != null) { + result.add(decode(record)); } - if (!records.isEmpty()) { - List result = new ArrayList<>(records.size()); - records.forEach(record -> { - try { - if (record != null) { - result.add(decode(record)); - } - } catch (IOException e) { - log.error("Failed decode record: [{}]", record); - throw new RuntimeException("Failed to decode record: ", e); - } - }); - return result; - } else { - long pollDuration = System.currentTimeMillis() - pollStartTs; - if (pollDuration < durationInMillis) { - try { - Thread.sleep(durationInMillis - pollDuration); - } catch (InterruptedException e) { - if (!stopped) { - log.error("Failed to wait.", e); - } - } - } + } catch (IOException e) { + log.error("Failed decode record: [{}]", record); + throw new RuntimeException("Failed to decode record: ", e); + } + }); + return result; + } + + List errorAndReturnEmpty() { + log.error("poll invoked but consumer stopped for topic" + topic, new RuntimeException("stacktrace")); + return emptyList(); + } + + List sleepAndReturnEmpty(final long startNanos, final long durationInMillis) { + long durationNanos = TimeUnit.MILLISECONDS.toNanos(durationInMillis); + long spentNanos = System.nanoTime() - startNanos; + if (spentNanos < durationNanos) { + try { + Thread.sleep(Math.max(TimeUnit.NANOSECONDS.toMillis(durationNanos - spentNanos), 1)); + } catch (InterruptedException e) { + if (!stopped) { + log.error("Failed to wait", e); } - } finally { - consumerLock.unlock(); } } - return Collections.emptyList(); + return emptyList(); } @Override public void commit() { + if (consumerLock.isLocked()) { + log.error("commit. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock", new RuntimeException("stacktrace")); + } consumerLock.lock(); try { doCommit(); @@ -135,6 +158,7 @@ public abstract class AbstractTbQueueConsumerTemplate i @Override public void unsubscribe() { + log.info("unsubscribe topic and stop consumer {}", getTopic()); stopped = true; consumerLock.lock(); try { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index b1b3d5ce05..4429f3518f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -72,8 +72,10 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue protected void doSubscribe(List topicNames) { if (!topicNames.isEmpty()) { topicNames.forEach(admin::createTopicIfNotExists); + log.info("subscribe topics {}", topicNames); consumer.subscribe(topicNames); } else { + log.info("unsubscribe due to empty topic list"); consumer.unsubscribe(); } } @@ -102,6 +104,7 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue @Override protected void doUnsubscribe() { + log.info("unsubscribe topic and close consumer for topic {}", getTopic()); if (consumer != null) { consumer.unsubscribe(); consumer.close();