mqtt transport accumulate msg before device isConnected

This commit is contained in:
Sergey Matvienko 2021-07-29 18:51:00 +03:00 committed by Andrew Shvayka
parent 49a156846d
commit 5741e34c62
5 changed files with 71 additions and 7 deletions

View File

@ -23,10 +23,14 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
/**
* Created by ashvayka on 04.10.18.
*/
@ -59,4 +63,11 @@ public class MqttTransportContext extends TransportContext {
@Setter
private SslHandler sslHandler;
@Getter
@Value("${transport.mqtt.msg_queue_size_per_device_limit:10000}")
private int messageQueueSizePerDeviceLimit;
@Getter
private final ExecutorService msqProcessorExecutor = ThingsBoardExecutors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() + 1, getClass());
}

View File

@ -177,7 +177,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} else if (deviceSessionCtx.isProvisionOnly()) {
processProvisionSessionMsg(ctx, msg);
} else {
processRegularSessionMsg(ctx, msg);
enqueueRegularSessionMsg(ctx, msg);
}
}
@ -223,6 +223,41 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void enqueueRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {
final int queueSize = deviceSessionCtx.getMsgQueueSize().incrementAndGet();
if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) {
log.warn("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",
deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueue().size());
ctx.close();
return;
}
deviceSessionCtx.getMsgQueue().add(msg);
processMsgQueue(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool
}
private void processMsgQueue(ChannelHandlerContext ctx) {
if (!deviceSessionCtx.isConnected()) {
log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueue().size());
return;
}
while (!deviceSessionCtx.getMsgQueue().isEmpty()) {
if (deviceSessionCtx.getMsgQueueProcessorLock().tryLock()) {
try {
MqttMessage msg;
while ((msg = deviceSessionCtx.getMsgQueue().poll()) != null) {
deviceSessionCtx.getMsgQueueSize().decrementAndGet();
processRegularSessionMsg(ctx, msg);
}
} finally {
deviceSessionCtx.getMsgQueueProcessorLock().unlock();
}
} else {
return;
}
}
}
private void processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {
switch (msg.fixedHeader().messageType()) {
case PUBLISH:
@ -762,6 +797,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
deviceSessionCtx.setDisconnected();
}
if (!deviceSessionCtx.getMsgQueue().isEmpty()) {
log.warn("doDisconnect for device {} but unprocessed messages {} left in the msg queue", deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueue().size());
deviceSessionCtx.getMsgQueue().clear();
}
}
@ -779,7 +819,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this);
checkGatewaySession(sessionMetaData);
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage));
deviceSessionCtx.setConnected(true);
log.info("[{}] Client connected!", sessionId);
context.getMsqProcessorExecutor().execute(()->processMsgQueue(ctx));
}
@Override

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.transport.mqtt.session;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@ -35,8 +36,11 @@ import org.thingsboard.server.transport.mqtt.util.MqttTopicFilter;
import org.thingsboard.server.transport.mqtt.util.MqttTopicFilterFactory;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Andrew Shvayka
@ -45,13 +49,23 @@ import java.util.concurrent.atomic.AtomicInteger;
public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
@Getter
@Setter
private ChannelHandlerContext channel;
@Getter
private MqttTransportContext context;
private final MqttTransportContext context;
private final AtomicInteger msgIdSeq = new AtomicInteger(0);
@Getter
private final ConcurrentLinkedQueue<MqttMessage> msgQueue = new ConcurrentLinkedQueue<>();
@Getter
private final Lock msgQueueProcessorLock = new ReentrantLock();
@Getter
private final AtomicInteger msgQueueSize = new AtomicInteger(0);
@Getter
@Setter
private boolean provisionOnly = false;
@ -73,10 +87,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
this.context = context;
}
public void setChannel(ChannelHandlerContext channel) {
this.channel = channel;
}
public int nextMsgId() {
return msgIdSeq.incrementAndGet();
}

View File

@ -46,6 +46,7 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
@Setter
private volatile TransportProtos.SessionInfoProto sessionInfo;
@Setter
private volatile boolean connected;
public DeviceId getDeviceId() {
@ -54,7 +55,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
public void setDeviceInfo(TransportDeviceInfo deviceInfo) {
this.deviceInfo = deviceInfo;
this.connected = true;
this.deviceId = deviceInfo.getDeviceId();
}

View File

@ -89,6 +89,7 @@ transport:
bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"
bind_port: "${MQTT_BIND_PORT:1883}"
timeout: "${MQTT_TIMEOUT:10000}"
msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:10000}" # messages await in the queue before device connected state. This limit works on low level before TenantProfileLimits mechanism
netty:
leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"
boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"