in memory queue logs
This commit is contained in:
parent
17ee07e09a
commit
18ca330c9e
@ -29,6 +29,8 @@
|
||||
|
||||
<!-- <logger name="org.thingsboard.server.service.queue" level="TRACE" />-->
|
||||
<!-- <logger name="org.thingsboard.server.service.transport" level="TRACE" />-->
|
||||
<!-- <logger name="org.thingsboard.server.queue.memory.InMemoryStorage" level="DEBUG" />-->
|
||||
|
||||
|
||||
<logger name="com.microsoft.azure.servicebus.primitives.CoreMessageReceiver" level="OFF" />
|
||||
|
||||
|
||||
@ -594,6 +594,10 @@ swagger:
|
||||
|
||||
queue:
|
||||
type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
||||
in_memory:
|
||||
stats:
|
||||
# For debug lvl
|
||||
print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
|
||||
kafka:
|
||||
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
||||
acks: "${TB_KAFKA_ACKS:all}"
|
||||
|
||||
@ -23,27 +23,21 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
public final class InMemoryStorage {
|
||||
private static InMemoryStorage instance;
|
||||
private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage;
|
||||
private static ScheduledExecutorService statExecutor;
|
||||
|
||||
private InMemoryStorage() {
|
||||
storage = new ConcurrentHashMap<>();
|
||||
statExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
statExecutor.scheduleAtFixedRate(this::printStats, 60, 60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void printStats() {
|
||||
public void printStats() {
|
||||
storage.forEach((topic, queue) -> {
|
||||
if (queue.size() > 0) {
|
||||
log.debug("Topic: [{}], Queue size: [{}]", topic, queue.size());
|
||||
log.debug("[{}] Queue Size [{}]", topic, queue.size());
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -90,9 +84,4 @@ public final class InMemoryStorage {
|
||||
storage.clear();
|
||||
}
|
||||
|
||||
public void destroy() {
|
||||
if (statExecutor != null) {
|
||||
statExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -53,6 +53,6 @@ public class InMemoryTbQueueProducer<T extends TbQueueMsg> implements TbQueuePro
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
storage.destroy();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.queue.provider;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
@ -28,6 +29,7 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.queue.memory.InMemoryStorage;
|
||||
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
|
||||
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
|
||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
|
||||
@ -120,4 +122,9 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
|
||||
public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}")
|
||||
private void printInMemoryStats() {
|
||||
InMemoryStorage.getInstance().printStats();
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user