From 7e27c5b6833a725c4d0c4e524b3c483d95001451 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 13 Jun 2023 18:17:25 +0200 Subject: [PATCH 1/3] mqtt-client: messages processing moved from netty event loop pool and to the handlerExecutor to make netty handlers non-blocking --- .../msa/connectivity/MqttClientTest.java | 16 ++- .../connectivity/MqttGatewayClientTest.java | 16 ++- netty-mqtt/pom.xml | 4 + .../thingsboard/mqtt/MqttChannelHandler.java | 113 +++++++++++++----- .../java/org/thingsboard/mqtt/MqttClient.java | 7 +- .../org/thingsboard/mqtt/MqttClientImpl.java | 15 ++- .../thingsboard/mqtt/MqttSubscription.java | 2 +- .../mqtt/integration/MqttIntegrationTest.java | 16 ++- .../rule/engine/mqtt/TbMqttNode.java | 2 +- 9 files changed, 153 insertions(+), 38 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 893c8b565c..96e57549a8 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; @@ -74,8 +75,18 @@ import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevi public class MqttClientTest extends AbstractContainerTest { private Device device; + AbstractListeningExecutor handlerExecutor; + @BeforeMethod public void setUp() throws Exception { + this.handlerExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 4; + } + }; + handlerExecutor.init(); + testRestClient.login("tenant@thingsboard.org", "tenant"); device = testRestClient.postDevice("", defaultDevicePrototype("http_")); } @@ -83,6 +94,9 @@ public class MqttClientTest extends AbstractContainerTest { @AfterMethod public void tearDown() { testRestClient.deleteDeviceIfExists(device.getId()); + if (handlerExecutor != null) { + handlerExecutor.destroy(); + } } @Test public void telemetryUpload() throws Exception { @@ -465,7 +479,7 @@ public class MqttClientTest extends AbstractContainerTest { MqttClientConfig clientConfig = new MqttClientConfig(); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(username); - MqttClient mqttClient = MqttClient.create(clientConfig, listener); + MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); mqttClient.connect("localhost", 1883).get(); return mqttClient; } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index a038d4cf50..8cddb69fa3 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -32,6 +32,7 @@ import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.mqtt.MqttClient; @@ -76,8 +77,18 @@ public class MqttGatewayClientTest extends AbstractContainerTest { private MqttMessageListener listener; private JsonParser jsonParser = new JsonParser(); + AbstractListeningExecutor handlerExecutor; + @BeforeMethod public void createGateway() throws Exception { + this.handlerExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 4; + } + }; + handlerExecutor.init(); + testRestClient.login("tenant@thingsboard.org", "tenant"); gatewayDevice = testRestClient.postDevice("", defaultGatewayPrototype()); DeviceCredentials gatewayDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(gatewayDevice.getId()); @@ -94,6 +105,9 @@ public class MqttGatewayClientTest extends AbstractContainerTest { this.listener = null; this.mqttClient = null; this.createdDevice = null; + if (handlerExecutor != null) { + handlerExecutor.destroy(); + } } @Test @@ -407,7 +421,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { MqttClientConfig clientConfig = new MqttClientConfig(); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(deviceCredentials.getCredentialsId()); - MqttClient mqttClient = MqttClient.create(clientConfig, listener); + MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); mqttClient.connect("localhost", 1883).get(); return mqttClient; } diff --git a/netty-mqtt/pom.xml b/netty-mqtt/pom.xml index 60883f4b34..400b486e18 100644 --- a/netty-mqtt/pom.xml +++ b/netty-mqtt/pom.xml @@ -35,6 +35,10 @@ + + org.thingsboard.common + util + io.netty netty-codec-mqtt diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java index e243f66633..6b3a4e009e 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java @@ -16,6 +16,10 @@ package org.thingsboard.mqtt; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -34,8 +38,15 @@ import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.util.CharsetUtil; +import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Promise; +import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.Nullable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j final class MqttChannelHandler extends SimpleChannelInboundHandler { private final MqttClientImpl client; @@ -110,27 +121,48 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler super.channelInactive(ctx); } - private void invokeHandlersForIncomingPublish(MqttPublishMessage message) { - boolean handlerInvoked = false; - for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) { - if (subscription.matches(message.variableHeader().topicName())) { - if (subscription.isOnce() && subscription.isCalled()) { - continue; + ListenableFuture invokeHandlersForIncomingPublish(MqttPublishMessage message) { + var future = Futures.immediateVoidFuture(); + var handlerInvoked = new AtomicBoolean(); + try { + for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) { + if (subscription.matches(message.variableHeader().topicName())) { + future = Futures.transform(future, x -> { + if (subscription.isOnce() && subscription.isCalled()) { + return null; + } + message.payload().markReaderIndex(); + subscription.setCalled(true); + subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload()); + if (subscription.isOnce()) { + this.client.off(subscription.getTopic(), subscription.getHandler()); + } + message.payload().resetReaderIndex(); + handlerInvoked.set(true); + return null; + }, client.getHandlerExecutor()); } - message.payload().markReaderIndex(); - subscription.setCalled(true); - subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload()); - if (subscription.isOnce()) { - this.client.off(subscription.getTopic(), subscription.getHandler()); - } - message.payload().resetReaderIndex(); - handlerInvoked = true; } + future = Futures.transform(future, x -> { + if (!handlerInvoked.get() && client.getDefaultHandler() != null) { + client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload()); + } + return null; + }, client.getHandlerExecutor()); + } finally { + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void result) { + message.payload().release(); + } + + @Override + public void onFailure(Throwable t) { + message.payload().release(); + } + }, MoreExecutors.directExecutor()); } - if (!handlerInvoked && client.getDefaultHandler() != null) { - client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload()); - } - message.payload().release(); + return future; } private void handleConack(Channel channel, MqttConnAckMessage message) { @@ -197,11 +229,13 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler break; case AT_LEAST_ONCE: - invokeHandlersForIncomingPublish(message); + var future = invokeHandlersForIncomingPublish(message); if (message.variableHeader().packetId() != -1) { - MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId()); - channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader)); + future.addListener(() -> { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId()); + channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader)); + }, MoreExecutors.directExecutor()); } break; @@ -256,14 +290,20 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler } private void handlePubrel(Channel channel, MqttMessage message) { + var future = Futures.immediateVoidFuture(); if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) { MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); - this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); - this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId()); + future = invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); + future = Futures.transform(future, x -> { + this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId()); + return null; + }, MoreExecutors.directExecutor()); } - MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); - channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader)); + future.addListener(() -> { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); + channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader)); + }, MoreExecutors.directExecutor()); } private void handlePubcomp(MqttMessage message) { @@ -274,4 +314,23 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler pendingPublish.getPayload().release(); pendingPublish.onPubcompReceived(); } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + try { + if (cause instanceof IOException) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}][{}] IOException: ", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), + cause); + } else if (log.isInfoEnabled()) { + log.info("[{}][{}][{}] IOException: {}", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), + cause.getMessage()); + } + } else { + log.warn("exceptionCaught", cause); + } + } finally { + ReferenceCountUtil.release(cause); + } + } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java index 2fe179de31..536a76119f 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java @@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.util.concurrent.Future; +import org.thingsboard.common.util.ListeningExecutor; public interface MqttClient { @@ -71,6 +72,8 @@ public interface MqttClient { */ void setEventLoop(EventLoopGroup eventLoop); + ListeningExecutor getHandlerExecutor(); + /** * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler * @@ -180,8 +183,8 @@ public interface MqttClient { * @param config The config object to use while looking for settings * @param defaultHandler The handler for incoming messages that do not match any topic subscriptions */ - static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler){ - return new MqttClientImpl(config, defaultHandler); + static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler, ListeningExecutor handlerExecutor){ + return new MqttClientImpl(config, defaultHandler, handlerExecutor); } /** diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index f38a790be7..63d65a1cc2 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -46,6 +46,7 @@ import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ListeningExecutor; import java.util.Collections; import java.util.HashSet; @@ -88,13 +89,13 @@ final class MqttClientImpl implements MqttClient { private int port; private MqttClientCallback callback; + private final ListeningExecutor handlerExecutor; /** * Construct the MqttClientImpl with default config */ - public MqttClientImpl(MqttHandler defaultHandler) { - this.clientConfig = new MqttClientConfig(); - this.defaultHandler = defaultHandler; + public MqttClientImpl(MqttHandler defaultHandler, ListeningExecutor handlerExecutor) { + this(new MqttClientConfig(), defaultHandler, handlerExecutor); } /** @@ -103,9 +104,10 @@ final class MqttClientImpl implements MqttClient { * * @param clientConfig The config object to use while looking for settings */ - public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler) { + public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler, ListeningExecutor handlerExecutor) { this.clientConfig = clientConfig; this.defaultHandler = defaultHandler; + this.handlerExecutor = handlerExecutor; } /** @@ -227,6 +229,11 @@ final class MqttClientImpl implements MqttClient { this.eventLoop = eventLoop; } + @Override + public ListeningExecutor getHandlerExecutor() { + return this.handlerExecutor; + } + /** * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler * diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java index 6c4abb4c5c..c4bc9e38c1 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java @@ -25,7 +25,7 @@ final class MqttSubscription { private final boolean once; - private boolean called; + private volatile boolean called; MqttSubscription(String topic, MqttHandler handler, boolean once) { if (topic == null) { diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java index cb1b6b81fe..f39ca01110 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java @@ -26,6 +26,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttConnectResult; @@ -49,8 +50,18 @@ public class MqttIntegrationTest { MqttClient mqttClient; + AbstractListeningExecutor handlerExecutor; + @Before public void init() throws Exception { + this.handlerExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 4; + } + }; + handlerExecutor.init(); + this.eventLoopGroup = new NioEventLoopGroup(); this.mqttServer = new MqttServer(); @@ -68,6 +79,9 @@ public class MqttIntegrationTest { if (this.eventLoopGroup != null) { this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } + if (this.handlerExecutor != null) { + this.handlerExecutor.destroy(); + } } @Test @@ -110,7 +124,7 @@ public class MqttIntegrationTest { MqttClientConfig config = new MqttClientConfig(); config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS); config.setReconnectDelay(RECONNECT_DELAY_SECONDS); - MqttClient client = MqttClient.create(config, null); + MqttClient client = MqttClient.create(config, null, handlerExecutor); client.setEventLoop(this.eventLoopGroup); Future connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 121b9fb756..49b31cb33c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -114,7 +114,7 @@ public class TbMqttNode extends TbAbstractExternalNode { config.setCleanSession(this.mqttNodeConfiguration.isCleanSession()); prepareMqttClientConfig(config); - MqttClient client = MqttClient.create(config, null); + MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor()); client.setEventLoop(ctx.getSharedEventLoop()); Future connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); MqttConnectResult result; From d74e0c45df8442e709928c3c04efe36442782a07 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 20 Jun 2023 14:26:30 +0200 Subject: [PATCH 2/3] MqttHandler - processAsync (required for AbstractMqttIntegration) --- .../thingsboard/server/msa/connectivity/MqttClientTest.java | 4 +++- .../server/msa/connectivity/MqttGatewayClientTest.java | 4 +++- .../src/main/java/org/thingsboard/mqtt/MqttHandler.java | 3 ++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 96e57549a8..940bd6777e 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.msa.connectivity; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -493,9 +494,10 @@ public class MqttClientTest extends AbstractContainerTest { } @Override - public void onMessage(String topic, ByteBuf message) { + public ListenableFuture onMessage(String topic, ByteBuf message) { log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); + return Futures.immediateVoidFuture(); } } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index 8cddb69fa3..de11df2623 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.msa.connectivity; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -435,9 +436,10 @@ public class MqttGatewayClientTest extends AbstractContainerTest { } @Override - public void onMessage(String topic, ByteBuf message) { + public ListenableFuture onMessage(String topic, ByteBuf message) { log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); + return Futures.immediateVoidFuture(); } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java index 0ec03ff04b..21c07a17cd 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java @@ -15,9 +15,10 @@ */ package org.thingsboard.mqtt; +import com.google.common.util.concurrent.ListenableFuture; import io.netty.buffer.ByteBuf; public interface MqttHandler { - void onMessage(String topic, ByteBuf payload); + ListenableFuture onMessage(String topic, ByteBuf payload); } From ed6614af71a3c0d4c08c9772ed84f08e2e2c0ea3 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 Jul 2023 20:59:58 +0200 Subject: [PATCH 3/3] MqttClientConfig - ownerId added for exceptions logging purposes. MqttChannelHandler - improved logging --- .../server/msa/connectivity/MqttClientTest.java | 5 +++++ .../server/msa/connectivity/MqttGatewayClientTest.java | 5 +++++ .../java/org/thingsboard/mqtt/MqttChannelHandler.java | 10 ++++------ .../java/org/thingsboard/mqtt/MqttClientConfig.java | 6 +++++- .../mqtt/integration/MqttIntegrationTest.java | 1 + .../org/thingsboard/rule/engine/mqtt/TbMqttNode.java | 6 +++++- 6 files changed, 25 insertions(+), 8 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 940bd6777e..5a7e9b631d 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -476,8 +476,13 @@ public class MqttClientTest extends AbstractContainerTest { return getMqttClient(deviceCredentials.getCredentialsId(), listener); } + private String getOwnerId() { + return "Tenant[" + device.getTenantId().getId() + "]MqttClientTestDevice[" + device.getId().getId() + "]"; + } + private MqttClient getMqttClient(String username, MqttMessageListener listener) throws InterruptedException, ExecutionException { MqttClientConfig clientConfig = new MqttClientConfig(); + clientConfig.setOwnerId(getOwnerId()); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(username); MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index de11df2623..5e2a9f5fe6 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -418,8 +418,13 @@ public class MqttGatewayClientTest extends AbstractContainerTest { return testRestClient.getDeviceById(createdDeviceId); } + private String getOwnerId() { + return "Tenant[" + gatewayDevice.getTenantId().getId() + "]MqttGatewayClientTestDevice[" + gatewayDevice.getId().getId() + "]"; + } + private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException { MqttClientConfig clientConfig = new MqttClientConfig(); + clientConfig.setOwnerId(getOwnerId()); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(deviceCredentials.getCredentialsId()); MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java index 6b3a4e009e..a40c30ef96 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java @@ -320,14 +320,12 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler try { if (cause instanceof IOException) { if (log.isDebugEnabled()) { - log.debug("[{}][{}][{}] IOException: ", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), - cause); - } else if (log.isInfoEnabled()) { - log.info("[{}][{}][{}] IOException: {}", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), - cause.getMessage()); + log.debug("[{}] IOException: ", client.getClientConfig().getOwnerId(), cause); + } else { + log.info("[{}] IOException: {}", client.getClientConfig().getOwnerId(), cause.getMessage()); } } else { - log.warn("exceptionCaught", cause); + log.warn("[{}] exceptionCaught", client.getClientConfig().getOwnerId(), cause); } } finally { ReferenceCountUtil.release(cause); diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java index 1e20a842c3..10cee5d5fc 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java @@ -19,6 +19,8 @@ import io.netty.channel.Channel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.mqtt.MqttVersion; import io.netty.handler.ssl.SslContext; +import lombok.Getter; +import lombok.Setter; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -26,10 +28,12 @@ import java.util.Random; @SuppressWarnings({"WeakerAccess", "unused"}) public final class MqttClientConfig { - private final SslContext sslContext; private final String randomClientId; + @Getter + @Setter + private String ownerId; // [TenantId][IntegrationId] or [TenantId][RuleNodeId] for exceptions logging purposes private String clientId; private int timeoutSeconds = 60; private MqttVersion protocolVersion = MqttVersion.MQTT_3_1; diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java index f39ca01110..04c2be1740 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java @@ -122,6 +122,7 @@ public class MqttIntegrationTest { private MqttClient initClient() throws Exception { MqttClientConfig config = new MqttClientConfig(); + config.setOwnerId("MqttIntegrationTest"); config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS); config.setReconnectDelay(RECONNECT_DELAY_SECONDS); MqttClient client = MqttClient.create(config, null, handlerExecutor); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 49b31cb33c..1376f29710 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -25,7 +25,6 @@ import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttConnectResult; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; @@ -105,8 +104,13 @@ public class TbMqttNode extends TbAbstractExternalNode { } } + String getOwnerId(TbContext ctx) { + return "Tenant[" + ctx.getTenantId().getId() + "]RuleNode[" + ctx.getSelf().getId().getId() + "]"; + } + protected MqttClient initClient(TbContext ctx) throws Exception { MqttClientConfig config = new MqttClientConfig(getSslContext()); + config.setOwnerId(getOwnerId(ctx)); if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) { config.setClientId(this.mqttNodeConfiguration.isAppendClientIdSuffix() ? this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : this.mqttNodeConfiguration.getClientId());