From 18ca330c9ea4120c6d8d28d18ad0d1adf13c89e9 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 28 Aug 2020 17:02:04 +0300 Subject: [PATCH] in memory queue logs --- application/src/main/resources/logback.xml | 2 ++ application/src/main/resources/thingsboard.yml | 4 ++++ .../server/queue/memory/InMemoryStorage.java | 15 ++------------- .../queue/memory/InMemoryTbQueueProducer.java | 2 +- .../provider/InMemoryMonolithQueueFactory.java | 7 +++++++ 5 files changed, 16 insertions(+), 14 deletions(-) diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index 01aec9ceb9..81eb788e35 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -29,6 +29,8 @@ + + diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index b6894bb0e7..ac76b67cd3 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -594,6 +594,10 @@ swagger: queue: type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) + in_memory: + stats: + # For debug lvl + print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}" kafka: bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" acks: "${TB_KAFKA_ACKS:all}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java index 994ca26305..31f7958bbf 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java @@ -23,27 +23,21 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; @Slf4j public final class InMemoryStorage { private static InMemoryStorage instance; private final ConcurrentHashMap> storage; - private static ScheduledExecutorService statExecutor; private InMemoryStorage() { storage = new ConcurrentHashMap<>(); - statExecutor = Executors.newSingleThreadScheduledExecutor(); - statExecutor.scheduleAtFixedRate(this::printStats, 60, 60, TimeUnit.SECONDS); } - private void printStats() { + public void printStats() { storage.forEach((topic, queue) -> { if (queue.size() > 0) { - log.debug("Topic: [{}], Queue size: [{}]", topic, queue.size()); + log.debug("[{}] Queue Size [{}]", topic, queue.size()); } }); } @@ -90,9 +84,4 @@ public final class InMemoryStorage { storage.clear(); } - public void destroy() { - if (statExecutor != null) { - statExecutor.shutdownNow(); - } - } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java index 84a9a1fdf0..cfcd788a16 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java @@ -53,6 +53,6 @@ public class InMemoryTbQueueProducer implements TbQueuePro @Override public void stop() { - storage.destroy(); + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index fbdbeaab0c..8e2ff01e28 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -17,6 +17,7 @@ package org.thingsboard.server.queue.provider; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -28,6 +29,7 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.memory.InMemoryStorage; import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; @@ -120,4 +122,9 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { return null; } + + @Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}") + private void printInMemoryStats() { + InMemoryStorage.getInstance().printStats(); + } }