Merge pull request #8996 from smatvienko-tb/feature/mqtt-client-channel-handler-invokeHandlersForIncomingPublish

MQTT client channel handler invoke handlers for incoming publish (master branch base)
This commit is contained in:
Andrew Shvayka 2023-07-31 17:25:23 +03:00 committed by GitHub
commit 14378f4876
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 180 additions and 43 deletions

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.msa.connectivity; package org.thingsboard.server.msa.connectivity;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -28,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttClientConfig;
@ -74,8 +76,18 @@ import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevi
public class MqttClientTest extends AbstractContainerTest { public class MqttClientTest extends AbstractContainerTest {
private Device device; private Device device;
AbstractListeningExecutor handlerExecutor;
@BeforeMethod @BeforeMethod
public void setUp() throws Exception { public void setUp() throws Exception {
this.handlerExecutor = new AbstractListeningExecutor() {
@Override
protected int getThreadPollSize() {
return 4;
}
};
handlerExecutor.init();
testRestClient.login("tenant@thingsboard.org", "tenant"); testRestClient.login("tenant@thingsboard.org", "tenant");
device = testRestClient.postDevice("", defaultDevicePrototype("http_")); device = testRestClient.postDevice("", defaultDevicePrototype("http_"));
} }
@ -83,6 +95,9 @@ public class MqttClientTest extends AbstractContainerTest {
@AfterMethod @AfterMethod
public void tearDown() { public void tearDown() {
testRestClient.deleteDeviceIfExists(device.getId()); testRestClient.deleteDeviceIfExists(device.getId());
if (handlerExecutor != null) {
handlerExecutor.destroy();
}
} }
@Test @Test
public void telemetryUpload() throws Exception { public void telemetryUpload() throws Exception {
@ -461,11 +476,16 @@ public class MqttClientTest extends AbstractContainerTest {
return getMqttClient(deviceCredentials.getCredentialsId(), listener); return getMqttClient(deviceCredentials.getCredentialsId(), listener);
} }
private String getOwnerId() {
return "Tenant[" + device.getTenantId().getId() + "]MqttClientTestDevice[" + device.getId().getId() + "]";
}
private MqttClient getMqttClient(String username, MqttMessageListener listener) throws InterruptedException, ExecutionException { private MqttClient getMqttClient(String username, MqttMessageListener listener) throws InterruptedException, ExecutionException {
MqttClientConfig clientConfig = new MqttClientConfig(); MqttClientConfig clientConfig = new MqttClientConfig();
clientConfig.setOwnerId(getOwnerId());
clientConfig.setClientId("MQTT client from test"); clientConfig.setClientId("MQTT client from test");
clientConfig.setUsername(username); clientConfig.setUsername(username);
MqttClient mqttClient = MqttClient.create(clientConfig, listener); MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor);
mqttClient.connect("localhost", 1883).get(); mqttClient.connect("localhost", 1883).get();
return mqttClient; return mqttClient;
} }
@ -479,9 +499,10 @@ public class MqttClientTest extends AbstractContainerTest {
} }
@Override @Override
public void onMessage(String topic, ByteBuf message) { public ListenableFuture<Void> onMessage(String topic, ByteBuf message) {
log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic);
events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8)));
return Futures.immediateVoidFuture();
} }
} }

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.msa.connectivity; package org.thingsboard.server.msa.connectivity;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -32,6 +33,7 @@ import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClient;
@ -76,8 +78,18 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
private MqttMessageListener listener; private MqttMessageListener listener;
private JsonParser jsonParser = new JsonParser(); private JsonParser jsonParser = new JsonParser();
AbstractListeningExecutor handlerExecutor;
@BeforeMethod @BeforeMethod
public void createGateway() throws Exception { public void createGateway() throws Exception {
this.handlerExecutor = new AbstractListeningExecutor() {
@Override
protected int getThreadPollSize() {
return 4;
}
};
handlerExecutor.init();
testRestClient.login("tenant@thingsboard.org", "tenant"); testRestClient.login("tenant@thingsboard.org", "tenant");
gatewayDevice = testRestClient.postDevice("", defaultGatewayPrototype()); gatewayDevice = testRestClient.postDevice("", defaultGatewayPrototype());
DeviceCredentials gatewayDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(gatewayDevice.getId()); DeviceCredentials gatewayDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(gatewayDevice.getId());
@ -94,6 +106,9 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
this.listener = null; this.listener = null;
this.mqttClient = null; this.mqttClient = null;
this.createdDevice = null; this.createdDevice = null;
if (handlerExecutor != null) {
handlerExecutor.destroy();
}
} }
@Test @Test
@ -403,11 +418,16 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
return testRestClient.getDeviceById(createdDeviceId); return testRestClient.getDeviceById(createdDeviceId);
} }
private String getOwnerId() {
return "Tenant[" + gatewayDevice.getTenantId().getId() + "]MqttGatewayClientTestDevice[" + gatewayDevice.getId().getId() + "]";
}
private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException { private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException {
MqttClientConfig clientConfig = new MqttClientConfig(); MqttClientConfig clientConfig = new MqttClientConfig();
clientConfig.setOwnerId(getOwnerId());
clientConfig.setClientId("MQTT client from test"); clientConfig.setClientId("MQTT client from test");
clientConfig.setUsername(deviceCredentials.getCredentialsId()); clientConfig.setUsername(deviceCredentials.getCredentialsId());
MqttClient mqttClient = MqttClient.create(clientConfig, listener); MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor);
mqttClient.connect("localhost", 1883).get(); mqttClient.connect("localhost", 1883).get();
return mqttClient; return mqttClient;
} }
@ -421,9 +441,10 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
} }
@Override @Override
public void onMessage(String topic, ByteBuf message) { public ListenableFuture<Void> onMessage(String topic, ByteBuf message) {
log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic);
events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8)));
return Futures.immediateVoidFuture();
} }
} }

View File

@ -35,6 +35,10 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>util</artifactId>
</dependency>
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId> <artifactId>netty-codec-mqtt</artifactId>

View File

@ -16,6 +16,10 @@
package org.thingsboard.mqtt; package org.thingsboard.mqtt;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
@ -34,8 +38,15 @@ import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> { final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> {
private final MqttClientImpl client; private final MqttClientImpl client;
@ -110,12 +121,15 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
super.channelInactive(ctx); super.channelInactive(ctx);
} }
private void invokeHandlersForIncomingPublish(MqttPublishMessage message) { ListenableFuture<Void> invokeHandlersForIncomingPublish(MqttPublishMessage message) {
boolean handlerInvoked = false; var future = Futures.immediateVoidFuture();
var handlerInvoked = new AtomicBoolean();
try {
for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) { for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) {
if (subscription.matches(message.variableHeader().topicName())) { if (subscription.matches(message.variableHeader().topicName())) {
future = Futures.transform(future, x -> {
if (subscription.isOnce() && subscription.isCalled()) { if (subscription.isOnce() && subscription.isCalled()) {
continue; return null;
} }
message.payload().markReaderIndex(); message.payload().markReaderIndex();
subscription.setCalled(true); subscription.setCalled(true);
@ -124,15 +138,33 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
this.client.off(subscription.getTopic(), subscription.getHandler()); this.client.off(subscription.getTopic(), subscription.getHandler());
} }
message.payload().resetReaderIndex(); message.payload().resetReaderIndex();
handlerInvoked = true; handlerInvoked.set(true);
return null;
}, client.getHandlerExecutor());
} }
} }
if (!handlerInvoked && client.getDefaultHandler() != null) { future = Futures.transform(future, x -> {
if (!handlerInvoked.get() && client.getDefaultHandler() != null) {
client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload()); client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload());
} }
return null;
}, client.getHandlerExecutor());
} finally {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void result) {
message.payload().release(); message.payload().release();
} }
@Override
public void onFailure(Throwable t) {
message.payload().release();
}
}, MoreExecutors.directExecutor());
}
return future;
}
private void handleConack(Channel channel, MqttConnAckMessage message) { private void handleConack(Channel channel, MqttConnAckMessage message) {
switch (message.variableHeader().connectReturnCode()) { switch (message.variableHeader().connectReturnCode()) {
case CONNECTION_ACCEPTED: case CONNECTION_ACCEPTED:
@ -197,11 +229,13 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
break; break;
case AT_LEAST_ONCE: case AT_LEAST_ONCE:
invokeHandlersForIncomingPublish(message); var future = invokeHandlersForIncomingPublish(message);
if (message.variableHeader().packetId() != -1) { if (message.variableHeader().packetId() != -1) {
future.addListener(() -> {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId()); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId());
channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader)); channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader));
}, MoreExecutors.directExecutor());
} }
break; break;
@ -256,14 +290,20 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
} }
private void handlePubrel(Channel channel, MqttMessage message) { private void handlePubrel(Channel channel, MqttMessage message) {
var future = Futures.immediateVoidFuture();
if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) { if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) {
MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); future = invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish());
future = Futures.transform(future, x -> {
this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId()); this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId());
return null;
}, MoreExecutors.directExecutor());
} }
future.addListener(() -> {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader)); channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader));
}, MoreExecutors.directExecutor());
} }
private void handlePubcomp(MqttMessage message) { private void handlePubcomp(MqttMessage message) {
@ -274,4 +314,21 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
pendingPublish.getPayload().release(); pendingPublish.getPayload().release();
pendingPublish.onPubcompReceived(); pendingPublish.onPubcompReceived();
} }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {
if (cause instanceof IOException) {
if (log.isDebugEnabled()) {
log.debug("[{}] IOException: ", client.getClientConfig().getOwnerId(), cause);
} else {
log.info("[{}] IOException: {}", client.getClientConfig().getOwnerId(), cause.getMessage());
}
} else {
log.warn("[{}] exceptionCaught", client.getClientConfig().getOwnerId(), cause);
}
} finally {
ReferenceCountUtil.release(cause);
}
}
} }

View File

@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import org.thingsboard.common.util.ListeningExecutor;
public interface MqttClient { public interface MqttClient {
@ -71,6 +72,8 @@ public interface MqttClient {
*/ */
void setEventLoop(EventLoopGroup eventLoop); void setEventLoop(EventLoopGroup eventLoop);
ListeningExecutor getHandlerExecutor();
/** /**
* Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
* *
@ -180,8 +183,8 @@ public interface MqttClient {
* @param config The config object to use while looking for settings * @param config The config object to use while looking for settings
* @param defaultHandler The handler for incoming messages that do not match any topic subscriptions * @param defaultHandler The handler for incoming messages that do not match any topic subscriptions
*/ */
static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler){ static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler, ListeningExecutor handlerExecutor){
return new MqttClientImpl(config, defaultHandler); return new MqttClientImpl(config, defaultHandler, handlerExecutor);
} }
/** /**

View File

@ -19,6 +19,8 @@ import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttVersion; import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import lombok.Getter;
import lombok.Setter;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -26,10 +28,12 @@ import java.util.Random;
@SuppressWarnings({"WeakerAccess", "unused"}) @SuppressWarnings({"WeakerAccess", "unused"})
public final class MqttClientConfig { public final class MqttClientConfig {
private final SslContext sslContext; private final SslContext sslContext;
private final String randomClientId; private final String randomClientId;
@Getter
@Setter
private String ownerId; // [TenantId][IntegrationId] or [TenantId][RuleNodeId] for exceptions logging purposes
private String clientId; private String clientId;
private int timeoutSeconds = 60; private int timeoutSeconds = 60;
private MqttVersion protocolVersion = MqttVersion.MQTT_3_1; private MqttVersion protocolVersion = MqttVersion.MQTT_3_1;

View File

@ -46,6 +46,7 @@ import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ListeningExecutor;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -88,13 +89,13 @@ final class MqttClientImpl implements MqttClient {
private int port; private int port;
private MqttClientCallback callback; private MqttClientCallback callback;
private final ListeningExecutor handlerExecutor;
/** /**
* Construct the MqttClientImpl with default config * Construct the MqttClientImpl with default config
*/ */
public MqttClientImpl(MqttHandler defaultHandler) { public MqttClientImpl(MqttHandler defaultHandler, ListeningExecutor handlerExecutor) {
this.clientConfig = new MqttClientConfig(); this(new MqttClientConfig(), defaultHandler, handlerExecutor);
this.defaultHandler = defaultHandler;
} }
/** /**
@ -103,9 +104,10 @@ final class MqttClientImpl implements MqttClient {
* *
* @param clientConfig The config object to use while looking for settings * @param clientConfig The config object to use while looking for settings
*/ */
public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler) { public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler, ListeningExecutor handlerExecutor) {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
this.defaultHandler = defaultHandler; this.defaultHandler = defaultHandler;
this.handlerExecutor = handlerExecutor;
} }
/** /**
@ -227,6 +229,11 @@ final class MqttClientImpl implements MqttClient {
this.eventLoop = eventLoop; this.eventLoop = eventLoop;
} }
@Override
public ListeningExecutor getHandlerExecutor() {
return this.handlerExecutor;
}
/** /**
* Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
* *

View File

@ -15,9 +15,10 @@
*/ */
package org.thingsboard.mqtt; package org.thingsboard.mqtt;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
public interface MqttHandler { public interface MqttHandler {
void onMessage(String topic, ByteBuf payload); ListenableFuture<Void> onMessage(String topic, ByteBuf payload);
} }

View File

@ -25,7 +25,7 @@ final class MqttSubscription {
private final boolean once; private final boolean once;
private boolean called; private volatile boolean called;
MqttSubscription(String topic, MqttHandler handler, boolean once) { MqttSubscription(String topic, MqttHandler handler, boolean once) {
if (topic == null) { if (topic == null) {

View File

@ -26,6 +26,7 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.mqtt.MqttConnectResult; import org.thingsboard.mqtt.MqttConnectResult;
@ -49,8 +50,18 @@ public class MqttIntegrationTest {
MqttClient mqttClient; MqttClient mqttClient;
AbstractListeningExecutor handlerExecutor;
@Before @Before
public void init() throws Exception { public void init() throws Exception {
this.handlerExecutor = new AbstractListeningExecutor() {
@Override
protected int getThreadPollSize() {
return 4;
}
};
handlerExecutor.init();
this.eventLoopGroup = new NioEventLoopGroup(); this.eventLoopGroup = new NioEventLoopGroup();
this.mqttServer = new MqttServer(); this.mqttServer = new MqttServer();
@ -68,6 +79,9 @@ public class MqttIntegrationTest {
if (this.eventLoopGroup != null) { if (this.eventLoopGroup != null) {
this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
} }
if (this.handlerExecutor != null) {
this.handlerExecutor.destroy();
}
} }
@Test @Test
@ -108,9 +122,10 @@ public class MqttIntegrationTest {
private MqttClient initClient() throws Exception { private MqttClient initClient() throws Exception {
MqttClientConfig config = new MqttClientConfig(); MqttClientConfig config = new MqttClientConfig();
config.setOwnerId("MqttIntegrationTest");
config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS); config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS);
config.setReconnectDelay(RECONNECT_DELAY_SECONDS); config.setReconnectDelay(RECONNECT_DELAY_SECONDS);
MqttClient client = MqttClient.create(config, null); MqttClient client = MqttClient.create(config, null, handlerExecutor);
client.setEventLoop(this.eventLoopGroup); client.setEventLoop(this.eventLoopGroup);
Future<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort()); Future<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());

View File

@ -25,7 +25,6 @@ import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.mqtt.MqttConnectResult; import org.thingsboard.mqtt.MqttConnectResult;
import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.util.TbNodeUtils;
@ -105,8 +104,13 @@ public class TbMqttNode extends TbAbstractExternalNode {
} }
} }
String getOwnerId(TbContext ctx) {
return "Tenant[" + ctx.getTenantId().getId() + "]RuleNode[" + ctx.getSelf().getId().getId() + "]";
}
protected MqttClient initClient(TbContext ctx) throws Exception { protected MqttClient initClient(TbContext ctx) throws Exception {
MqttClientConfig config = new MqttClientConfig(getSslContext()); MqttClientConfig config = new MqttClientConfig(getSslContext());
config.setOwnerId(getOwnerId(ctx));
if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) { if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) {
config.setClientId(this.mqttNodeConfiguration.isAppendClientIdSuffix() ? config.setClientId(this.mqttNodeConfiguration.isAppendClientIdSuffix() ?
this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : this.mqttNodeConfiguration.getClientId()); this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : this.mqttNodeConfiguration.getClientId());
@ -114,7 +118,7 @@ public class TbMqttNode extends TbAbstractExternalNode {
config.setCleanSession(this.mqttNodeConfiguration.isCleanSession()); config.setCleanSession(this.mqttNodeConfiguration.isCleanSession());
prepareMqttClientConfig(config); prepareMqttClientConfig(config);
MqttClient client = MqttClient.create(config, null); MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor());
client.setEventLoop(ctx.getSharedEventLoop()); client.setEventLoop(ctx.getSharedEventLoop());
Future<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); Future<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
MqttConnectResult result; MqttConnectResult result;