diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml
index 01aec9ceb9..81eb788e35 100644
--- a/application/src/main/resources/logback.xml
+++ b/application/src/main/resources/logback.xml
@@ -29,6 +29,8 @@
+
+
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index b6894bb0e7..ac76b67cd3 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -594,6 +594,10 @@ swagger:
queue:
type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
+ in_memory:
+ stats:
+ # For debug lvl
+ print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
kafka:
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
acks: "${TB_KAFKA_ACKS:all}"
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java
index 994ca26305..31f7958bbf 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java
@@ -23,27 +23,21 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
@Slf4j
public final class InMemoryStorage {
private static InMemoryStorage instance;
private final ConcurrentHashMap> storage;
- private static ScheduledExecutorService statExecutor;
private InMemoryStorage() {
storage = new ConcurrentHashMap<>();
- statExecutor = Executors.newSingleThreadScheduledExecutor();
- statExecutor.scheduleAtFixedRate(this::printStats, 60, 60, TimeUnit.SECONDS);
}
- private void printStats() {
+ public void printStats() {
storage.forEach((topic, queue) -> {
if (queue.size() > 0) {
- log.debug("Topic: [{}], Queue size: [{}]", topic, queue.size());
+ log.debug("[{}] Queue Size [{}]", topic, queue.size());
}
});
}
@@ -90,9 +84,4 @@ public final class InMemoryStorage {
storage.clear();
}
- public void destroy() {
- if (statExecutor != null) {
- statExecutor.shutdownNow();
- }
- }
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java
index 84a9a1fdf0..cfcd788a16 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java
@@ -53,6 +53,6 @@ public class InMemoryTbQueueProducer implements TbQueuePro
@Override
public void stop() {
- storage.destroy();
+
}
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java
index fbdbeaab0c..8ebda6e7b9 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.queue.provider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
@@ -28,6 +29,7 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
+import org.thingsboard.server.queue.memory.InMemoryStorage;
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
@@ -47,6 +49,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
+ private final InMemoryStorage storage;
public InMemoryMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
@@ -59,6 +62,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
this.ruleEngineSettings = ruleEngineSettings;
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
+ this.storage = InMemoryStorage.getInstance();
}
@Override
@@ -120,4 +124,9 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() {
return null;
}
+
+ @Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}")
+ private void printInMemoryStats() {
+ storage.printStats();
+ }
}