From d40bc488980abc04b60523ef83901f6a92e30932 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 30 Sep 2021 14:43:01 +0300 Subject: [PATCH] Ability to log number of active MQTT connections --- .../src/main/resources/thingsboard.yml | 3 +++ .../transport/mqtt/MqttTransportContext.java | 16 +++++++++++++ .../transport/mqtt/MqttTransportHandler.java | 12 ++++++++++ .../common/transport/TransportContext.java | 2 +- .../common/transport/TransportService.java | 3 +++ .../service/DefaultTransportService.java | 23 +++++++++++++++++++ .../src/main/resources/tb-coap-transport.yml | 3 +++ .../src/main/resources/tb-http-transport.yml | 3 +++ .../src/main/resources/tb-lwm2m-transport.yml | 3 +++ .../src/main/resources/tb-mqtt-transport.yml | 3 +++ .../src/main/resources/tb-snmp-transport.yml | 3 +++ 11 files changed, 73 insertions(+), 1 deletion(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 95ba839792..2c8d0c63bc 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -702,6 +702,9 @@ transport: parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:20}" # to configure SNMP to work over UDP or TCP underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}" + stats: + enabled: "${TB_TRANSPORT_STATS_ENABLED:true}" + print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}" # Edges parameters edges: diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index 1335b6105b..34666d8d91 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -31,6 +31,7 @@ import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; /** * Created by ashvayka on 04.10.18. @@ -71,4 +72,19 @@ public class MqttTransportContext extends TransportContext { @Getter @Value("${transport.mqtt.timeout:10000}") private long timeout; + + private final AtomicInteger connectionsCounter = new AtomicInteger(); + + @PostConstruct + public void init() { + transportService.createGaugeStats("openConnections", connectionsCounter); + } + + public void channelRegistered() { + connectionsCounter.incrementAndGet(); + } + + public void channelUnregistered() { + connectionsCounter.decrementAndGet(); + } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 80a2f49be7..0a115e12fa 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -146,6 +146,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement this.rpcAwaitingAck = new ConcurrentHashMap<>(); } + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + super.channelRegistered(ctx); + context.channelRegistered(); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + super.channelUnregistered(ctx); + context.channelUnregistered(); + } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.trace("[{}] Processing msg: {}", sessionId, msg); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java index d48d3fd04e..ec21851b84 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java @@ -39,7 +39,7 @@ public abstract class TransportContext { protected final ObjectMapper mapper = new ObjectMapper(); @Autowired - private TransportService transportService; + protected TransportService transportService; @Autowired private TbServiceInfoProvider serviceInfoProvider; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 28ad7ddb9b..aff9543286 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -58,6 +58,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenR import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; /** * Created by ashvayka on 04.10.18. @@ -138,4 +139,6 @@ public interface TransportService { ExecutorService getCallbackExecutor(); boolean hasSession(SessionInfoProto sessionInfo); + + void createGaugeStats(String openConnections, AtomicInteger connectionsCounter); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index bdc03d767c..7675c500c9 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -21,9 +21,11 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import com.google.gson.JsonObject; import com.google.protobuf.ByteString; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; @@ -96,6 +98,7 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -110,6 +113,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * Created by ashvayka on 17.10.18. @@ -135,6 +139,10 @@ public class DefaultTransportService implements TransportService { private long clientSideRpcTimeout; @Value("${queue.transport.poll_interval}") private int notificationsPollDuration; + @Value("${transport.stats.enabled:false}") + private boolean statsEnabled; + + private final Map statsMap = new LinkedHashMap<>(); private final Gson gson = new Gson(); private final TbTransportQueueFactory queueProvider; @@ -1167,4 +1175,19 @@ public class DefaultTransportService implements TransportService { public boolean hasSession(TransportProtos.SessionInfoProto sessionInfo) { return sessions.containsKey(toSessionId(sessionInfo)); } + + @Override + public void createGaugeStats(String statsName, AtomicInteger number) { + statsFactory.createGauge(StatsType.TRANSPORT + "." + statsName, number); + statsMap.put(statsName, number); + } + + @Scheduled(fixedDelayString = "${transport.stats.print-interval-ms:60000}") + public void printStats() { + if (statsEnabled) { + String values = statsMap.entrySet().stream() + .map(kv -> kv.getKey() + " [" + kv.getValue() + "]").collect(Collectors.joining(", ")); + log.info("Transport Stats: {}", values); + } + } } diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 0f66f28efb..9aab46b824 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -122,6 +122,9 @@ transport: log: enabled: "${TB_TRANSPORT_LOG_ENABLED:true}" max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}" + stats: + enabled: "${TB_TRANSPORT_STATS_ENABLED:true}" + print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}" queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 8840cc8e8b..f35f4136e7 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -94,6 +94,9 @@ transport: log: enabled: "${TB_TRANSPORT_LOG_ENABLED:true}" max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}" + stats: + enabled: "${TB_TRANSPORT_STATS_ENABLED:true}" + print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}" queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index d398b676dd..4280ccb1af 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -147,6 +147,9 @@ transport: paging_transmission_window: "${LWM2M_PAGING_TRANSMISSION_WINDOW:10000}" # Use redis for Security and Registration stores redis.enabled: "${LWM2M_REDIS_ENABLED:false}" + stats: + enabled: "${TB_TRANSPORT_STATS_ENABLED:true}" + print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}" queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 2191508636..e0d0144d84 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -127,6 +127,9 @@ transport: log: enabled: "${TB_TRANSPORT_LOG_ENABLED:true}" max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}" + stats: + enabled: "${TB_TRANSPORT_STATS_ENABLED:true}" + print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}" queue: diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 3453a7ace1..80bc257a8b 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -59,6 +59,9 @@ transport: log: enabled: "${TB_TRANSPORT_LOG_ENABLED:true}" max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}" + stats: + enabled: "${TB_TRANSPORT_STATS_ENABLED:true}" + print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}" queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)