From cbe51cee57b693efba9ec8b80ceca536c204021e Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 3 Aug 2021 19:24:39 +0300 Subject: [PATCH] execute processMsgQueue on connected callback in separate thread because producer.send() will execute in Producer worker thread and can block messages producing --- .../server/transport/mqtt/MqttTransportHandler.java | 2 +- .../server/common/transport/TransportService.java | 4 ++++ .../common/transport/service/DefaultTransportService.java | 5 +++++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index be6c47238b..3e3f2e74d9 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -825,7 +825,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage)); deviceSessionCtx.setConnected(true); log.info("[{}] Client connected!", sessionId); - processMsgQueue(ctx); + transportService.getCallbackExecutor().execute(() -> processMsgQueue(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread. } @Override diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 5f2fa4f197..237954c553 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -56,6 +56,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MC import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; +import java.util.concurrent.ExecutorService; + /** * Created by ashvayka on 04.10.18. */ @@ -131,4 +133,6 @@ public interface TransportService { void log(SessionInfoProto sessionInfo, String msg); void notifyAboutUplink(SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg build, TransportServiceCallback empty); + + ExecutorService getCallbackExecutor(); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index e3390eb9bb..c705d8364a 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -1141,4 +1141,9 @@ public class DefaultTransportService implements TransportService { callback.onError(e); } } + + @Override + public ExecutorService getCallbackExecutor() { + return transportCallbackExecutor; + } }