execute processMsgQueue on connected callback in separate thread because producer.send() will execute in Producer worker thread and can block messages producing

This commit is contained in:
Sergey Matvienko 2021-08-03 19:24:39 +03:00 committed by Andrew Shvayka
parent 607fd7a74f
commit cbe51cee57
3 changed files with 10 additions and 1 deletions

View File

@ -825,7 +825,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage)); ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage));
deviceSessionCtx.setConnected(true); deviceSessionCtx.setConnected(true);
log.info("[{}] Client connected!", sessionId); log.info("[{}] Client connected!", sessionId);
processMsgQueue(ctx); transportService.getCallbackExecutor().execute(() -> processMsgQueue(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.
} }
@Override @Override

View File

@ -56,6 +56,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MC
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import java.util.concurrent.ExecutorService;
/** /**
* Created by ashvayka on 04.10.18. * Created by ashvayka on 04.10.18.
*/ */
@ -131,4 +133,6 @@ public interface TransportService {
void log(SessionInfoProto sessionInfo, String msg); void log(SessionInfoProto sessionInfo, String msg);
void notifyAboutUplink(SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg build, TransportServiceCallback<Void> empty); void notifyAboutUplink(SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg build, TransportServiceCallback<Void> empty);
ExecutorService getCallbackExecutor();
} }

View File

@ -1141,4 +1141,9 @@ public class DefaultTransportService implements TransportService {
callback.onError(e); callback.onError(e);
} }
} }
@Override
public ExecutorService getCallbackExecutor() {
return transportCallbackExecutor;
}
} }