Merge with develop/2.5.5
This commit is contained in:
		
						commit
						088ab976a1
					
				@ -29,6 +29,8 @@
 | 
			
		||||
 | 
			
		||||
<!--    <logger name="org.thingsboard.server.service.queue" level="TRACE" />-->
 | 
			
		||||
<!--    <logger name="org.thingsboard.server.service.transport" level="TRACE" />-->
 | 
			
		||||
<!--    <logger name="org.thingsboard.server.queue.memory.InMemoryStorage" level="DEBUG" />-->
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
<!--    <logger name="org.thingsboard.server.service.subscription" level="TRACE"/>-->
 | 
			
		||||
<!--    <logger name="org.thingsboard.server.service.telemetry" level="TRACE"/>-->
 | 
			
		||||
 | 
			
		||||
@ -608,6 +608,10 @@ swagger:
 | 
			
		||||
 | 
			
		||||
queue:
 | 
			
		||||
  type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
 | 
			
		||||
  in_memory:
 | 
			
		||||
    stats:
 | 
			
		||||
      # For debug lvl
 | 
			
		||||
      print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
 | 
			
		||||
  kafka:
 | 
			
		||||
    bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
 | 
			
		||||
    acks: "${TB_KAFKA_ACKS:all}"
 | 
			
		||||
 | 
			
		||||
@ -23,27 +23,21 @@ import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.concurrent.BlockingQueue;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.LinkedBlockingQueue;
 | 
			
		||||
import java.util.concurrent.ScheduledExecutorService;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public final class InMemoryStorage {
 | 
			
		||||
    private static InMemoryStorage instance;
 | 
			
		||||
    private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage;
 | 
			
		||||
    private static ScheduledExecutorService statExecutor;
 | 
			
		||||
 | 
			
		||||
    private InMemoryStorage() {
 | 
			
		||||
        storage = new ConcurrentHashMap<>();
 | 
			
		||||
        statExecutor = Executors.newSingleThreadScheduledExecutor();
 | 
			
		||||
        statExecutor.scheduleAtFixedRate(this::printStats, 60, 60, TimeUnit.SECONDS);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void printStats() {
 | 
			
		||||
    public void printStats() {
 | 
			
		||||
        storage.forEach((topic, queue) -> {
 | 
			
		||||
            if (queue.size() > 0) {
 | 
			
		||||
                log.debug("Topic: [{}], Queue size: [{}]", topic, queue.size());
 | 
			
		||||
                log.debug("[{}] Queue Size [{}]", topic, queue.size());
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
@ -90,9 +84,4 @@ public final class InMemoryStorage {
 | 
			
		||||
        storage.clear();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
        if (statExecutor != null) {
 | 
			
		||||
            statExecutor.shutdownNow();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -53,6 +53,6 @@ public class InMemoryTbQueueProducer<T extends TbQueueMsg> implements TbQueuePro
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void stop() {
 | 
			
		||||
        storage.destroy();
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -17,6 +17,7 @@ package org.thingsboard.server.queue.provider;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
 | 
			
		||||
@ -28,6 +29,7 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.memory.InMemoryStorage;
 | 
			
		||||
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
 | 
			
		||||
@ -47,6 +49,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
 | 
			
		||||
    private final TbQueueRuleEngineSettings ruleEngineSettings;
 | 
			
		||||
    private final TbQueueTransportApiSettings transportApiSettings;
 | 
			
		||||
    private final TbQueueTransportNotificationSettings transportNotificationSettings;
 | 
			
		||||
    private final InMemoryStorage storage;
 | 
			
		||||
 | 
			
		||||
    public InMemoryMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
 | 
			
		||||
                                        TbQueueRuleEngineSettings ruleEngineSettings,
 | 
			
		||||
@ -59,6 +62,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
 | 
			
		||||
        this.ruleEngineSettings = ruleEngineSettings;
 | 
			
		||||
        this.transportApiSettings = transportApiSettings;
 | 
			
		||||
        this.transportNotificationSettings = transportNotificationSettings;
 | 
			
		||||
        this.storage = InMemoryStorage.getInstance();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -120,4 +124,9 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
 | 
			
		||||
    public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}")
 | 
			
		||||
    private void printInMemoryStats() {
 | 
			
		||||
        storage.printStats();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user