From 5741e34c62ee021dda152bc66dbf851a54599506 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 29 Jul 2021 18:51:00 +0300 Subject: [PATCH] mqtt transport accumulate msg before device isConnected --- .../transport/mqtt/MqttTransportContext.java | 11 +++++ .../transport/mqtt/MqttTransportHandler.java | 44 ++++++++++++++++++- .../mqtt/session/DeviceSessionCtx.java | 20 ++++++--- .../session/DeviceAwareSessionContext.java | 2 +- .../src/main/resources/tb-mqtt-transport.yml | 1 + 5 files changed, 71 insertions(+), 7 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index f160936c7c..81a7390df9 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -23,10 +23,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.transport.TransportContext; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; +import javax.annotation.PostConstruct; +import java.util.concurrent.ExecutorService; + /** * Created by ashvayka on 04.10.18. */ @@ -59,4 +63,11 @@ public class MqttTransportContext extends TransportContext { @Setter private SslHandler sslHandler; + @Getter + @Value("${transport.mqtt.msg_queue_size_per_device_limit:10000}") + private int messageQueueSizePerDeviceLimit; + + @Getter + private final ExecutorService msqProcessorExecutor = ThingsBoardExecutors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() + 1, getClass()); + } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 85b0190caa..0004f0dcf2 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -177,7 +177,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } else if (deviceSessionCtx.isProvisionOnly()) { processProvisionSessionMsg(ctx, msg); } else { - processRegularSessionMsg(ctx, msg); + enqueueRegularSessionMsg(ctx, msg); } } @@ -223,6 +223,41 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + private void enqueueRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) { + final int queueSize = deviceSessionCtx.getMsgQueueSize().incrementAndGet(); + if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) { + log.warn("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}", + deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueue().size()); + ctx.close(); + return; + } + + deviceSessionCtx.getMsgQueue().add(msg); + processMsgQueue(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool + } + + private void processMsgQueue(ChannelHandlerContext ctx) { + if (!deviceSessionCtx.isConnected()) { + log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueue().size()); + return; + } + while (!deviceSessionCtx.getMsgQueue().isEmpty()) { + if (deviceSessionCtx.getMsgQueueProcessorLock().tryLock()) { + try { + MqttMessage msg; + while ((msg = deviceSessionCtx.getMsgQueue().poll()) != null) { + deviceSessionCtx.getMsgQueueSize().decrementAndGet(); + processRegularSessionMsg(ctx, msg); + } + } finally { + deviceSessionCtx.getMsgQueueProcessorLock().unlock(); + } + } else { + return; + } + } + } + private void processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) { switch (msg.fixedHeader().messageType()) { case PUBLISH: @@ -762,6 +797,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } deviceSessionCtx.setDisconnected(); } + + if (!deviceSessionCtx.getMsgQueue().isEmpty()) { + log.warn("doDisconnect for device {} but unprocessed messages {} left in the msg queue", deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueue().size()); + deviceSessionCtx.getMsgQueue().clear(); + } } @@ -779,7 +819,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this); checkGatewaySession(sessionMetaData); ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage)); + deviceSessionCtx.setConnected(true); log.info("[{}] Client connected!", sessionId); + context.getMsqProcessorExecutor().execute(()->processMsgQueue(ctx)); } @Override diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index 3804d96cf6..b92e91981f 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -18,6 +18,7 @@ package org.thingsboard.server.transport.mqtt.session; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttMessage; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -35,8 +36,11 @@ import org.thingsboard.server.transport.mqtt.util.MqttTopicFilter; import org.thingsboard.server.transport.mqtt.util.MqttTopicFilterFactory; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * @author Andrew Shvayka @@ -45,13 +49,23 @@ import java.util.concurrent.atomic.AtomicInteger; public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { @Getter + @Setter private ChannelHandlerContext channel; @Getter - private MqttTransportContext context; + private final MqttTransportContext context; private final AtomicInteger msgIdSeq = new AtomicInteger(0); + @Getter + private final ConcurrentLinkedQueue msgQueue = new ConcurrentLinkedQueue<>(); + + @Getter + private final Lock msgQueueProcessorLock = new ReentrantLock(); + + @Getter + private final AtomicInteger msgQueueSize = new AtomicInteger(0); + @Getter @Setter private boolean provisionOnly = false; @@ -73,10 +87,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { this.context = context; } - public void setChannel(ChannelHandlerContext channel) { - this.channel = channel; - } - public int nextMsgId() { return msgIdSeq.incrementAndGet(); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java index 7d5bc281ec..1d2b7382d0 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java @@ -46,6 +46,7 @@ public abstract class DeviceAwareSessionContext implements SessionContext { @Setter private volatile TransportProtos.SessionInfoProto sessionInfo; + @Setter private volatile boolean connected; public DeviceId getDeviceId() { @@ -54,7 +55,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext { public void setDeviceInfo(TransportDeviceInfo deviceInfo) { this.deviceInfo = deviceInfo; - this.connected = true; this.deviceId = deviceInfo.getDeviceId(); } diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index c678d5afed..c03658703b 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -89,6 +89,7 @@ transport: bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}" bind_port: "${MQTT_BIND_PORT:1883}" timeout: "${MQTT_TIMEOUT:10000}" + msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:10000}" # messages await in the queue before device connected state. This limit works on low level before TenantProfileLimits mechanism netty: leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}" boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"