From 3e223ed8bfb8fd64b3f3cd294ab4b6f0b735f741 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Tue, 15 Jul 2025 10:31:54 +0300 Subject: [PATCH 1/2] Max client id for different mqtt protocol version --- .../server/common/data/DataConstants.java | 3 - .../mqtt/ChannelClosedException.java | 7 +- .../thingsboard/mqtt/MqttClientCallback.java | 4 +- .../thingsboard/mqtt/MqttClientConfig.java | 124 +++++------------- .../org/thingsboard/mqtt/MqttClientImpl.java | 77 +++-------- .../thingsboard/mqtt/MqttConnectResult.java | 16 +-- .../thingsboard/mqtt/MqttSubscription.java | 30 ++--- .../rule/engine/mqtt/TbMqttNode.java | 47 ++++--- .../rule/engine/mqtt/TbMqttNodeTest.java | 50 +++---- 9 files changed, 120 insertions(+), 238 deletions(-) diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index b2d9d59cca..8a72b26a28 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -15,9 +15,6 @@ */ package org.thingsboard.server.common.data; -/** - * @author Andrew Shvayka - */ public class DataConstants { public static final String TENANT = "TENANT"; diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/ChannelClosedException.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/ChannelClosedException.java index 0b50b1883a..a1987cfe98 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/ChannelClosedException.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/ChannelClosedException.java @@ -15,11 +15,11 @@ */ package org.thingsboard.mqtt; -/** - * Created by Valerii Sosliuk on 12/26/2017. - */ +import java.io.Serial; + public class ChannelClosedException extends RuntimeException { + @Serial private static final long serialVersionUID = 6266638352424706909L; public ChannelClosedException() { @@ -40,4 +40,5 @@ public class ChannelClosedException extends RuntimeException { public ChannelClosedException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } + } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java index 85b4499e36..ae5168e466 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java @@ -21,9 +21,6 @@ import io.netty.handler.codec.mqtt.MqttPubAckMessage; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; -/** - * Created by Valerii Sosliuk on 12/30/2017. - */ public interface MqttClientCallback { /** @@ -53,4 +50,5 @@ public interface MqttClientCallback { default void onDisconnect(MqttMessage mqttDisconnectMessage) { } + } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java index 24feb3e58e..e2b6967f06 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java @@ -28,23 +28,46 @@ import java.util.Random; @SuppressWarnings({"WeakerAccess", "unused"}) public final class MqttClientConfig { + + @Getter private final SslContext sslContext; private final String randomClientId; @Getter @Setter private String ownerId; // [TenantId][IntegrationId] or [TenantId][RuleNodeId] for exceptions logging purposes + @Nonnull + @Getter private String clientId; + @Getter private int timeoutSeconds = 60; + @Getter private MqttVersion protocolVersion = MqttVersion.MQTT_3_1; - @Nullable private String username = null; - @Nullable private String password = null; + @Nullable + @Getter + @Setter + private String username = null; + @Nullable + @Getter + @Setter + private String password = null; + @Getter + @Setter private boolean cleanSession = true; - @Nullable private MqttLastWill lastWill; + @Nullable + @Getter + @Setter + private MqttLastWill lastWill; + @Setter + @Getter private Class channelClass = NioSocketChannel.class; + @Getter + @Setter private boolean reconnect = true; + @Getter private long reconnectDelay = 1L; + @Getter private int maxBytesInMessage = 8092; @Getter @@ -74,109 +97,37 @@ public final class MqttClientConfig { public MqttClientConfig(SslContext sslContext) { this.sslContext = sslContext; Random random = new Random(); - String id = "netty-mqtt/"; + StringBuilder id = new StringBuilder("netty-mqtt/"); String[] options = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789".split(""); - for(int i = 0; i < 8; i++){ - id += options[random.nextInt(options.length)]; + for (int i = 0; i < 8; i++) { + id.append(options[random.nextInt(options.length)]); } - this.clientId = id; - this.randomClientId = id; - } - - @Nonnull - public String getClientId() { - return clientId; + this.clientId = id.toString(); + this.randomClientId = id.toString(); } public void setClientId(@Nullable String clientId) { - if(clientId == null){ + if (clientId == null) { this.clientId = randomClientId; - }else{ + } else { this.clientId = clientId; } } - public int getTimeoutSeconds() { - return timeoutSeconds; - } - public void setTimeoutSeconds(int timeoutSeconds) { - if(timeoutSeconds != -1 && timeoutSeconds <= 0){ + if (timeoutSeconds != -1 && timeoutSeconds <= 0) { throw new IllegalArgumentException("timeoutSeconds must be > 0 or -1"); } this.timeoutSeconds = timeoutSeconds; } - public MqttVersion getProtocolVersion() { - return protocolVersion; - } - public void setProtocolVersion(MqttVersion protocolVersion) { - if(protocolVersion == null){ + if (protocolVersion == null) { throw new NullPointerException("protocolVersion"); } this.protocolVersion = protocolVersion; } - @Nullable - public String getUsername() { - return username; - } - - public void setUsername(@Nullable String username) { - this.username = username; - } - - @Nullable - public String getPassword() { - return password; - } - - public void setPassword(@Nullable String password) { - this.password = password; - } - - public boolean isCleanSession() { - return cleanSession; - } - - public void setCleanSession(boolean cleanSession) { - this.cleanSession = cleanSession; - } - - @Nullable - public MqttLastWill getLastWill() { - return lastWill; - } - - public void setLastWill(@Nullable MqttLastWill lastWill) { - this.lastWill = lastWill; - } - - public Class getChannelClass() { - return channelClass; - } - - public void setChannelClass(Class channelClass) { - this.channelClass = channelClass; - } - - public SslContext getSslContext() { - return sslContext; - } - - public boolean isReconnect() { - return reconnect; - } - - public void setReconnect(boolean reconnect) { - this.reconnect = reconnect; - } - - public long getReconnectDelay() { - return reconnectDelay; - } - /** * Sets the reconnect delay in seconds. Defaults to 1 second. * @param reconnectDelay @@ -189,10 +140,6 @@ public final class MqttClientConfig { this.reconnectDelay = reconnectDelay; } - public int getMaxBytesInMessage() { - return maxBytesInMessage; - } - /** * Sets the maximum number of bytes in the message for the {@link io.netty.handler.codec.mqtt.MqttDecoder}. * Default value is 8092 as specified by Netty. The absolute maximum size is 256MB as set by the MQTT spec. @@ -206,4 +153,5 @@ public final class MqttClientConfig { } this.maxBytesInMessage = maxBytesInMessage; } + } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 801470284b..aec63224dd 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -46,7 +46,9 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; +import lombok.AccessLevel; import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ListeningExecutor; @@ -67,18 +69,28 @@ import java.util.concurrent.atomic.AtomicInteger; @Slf4j final class MqttClientImpl implements MqttClient { + @Getter(AccessLevel.PACKAGE) private final Set serverSubscriptions = new HashSet<>(); + @Getter(AccessLevel.PACKAGE) private final ConcurrentMap pendingServerUnsubscribes = new ConcurrentHashMap<>(); + @Getter(AccessLevel.PACKAGE) private final ConcurrentMap qos2PendingIncomingPublishes = new ConcurrentHashMap<>(); + @Getter(AccessLevel.PACKAGE) private final ConcurrentMap pendingPublishes = new ConcurrentHashMap<>(); + @Getter(AccessLevel.PACKAGE) private final HashMultimap subscriptions = HashMultimap.create(); + @Getter(AccessLevel.PACKAGE) private final ConcurrentMap pendingSubscriptions = new ConcurrentHashMap<>(); + @Getter(AccessLevel.PACKAGE) private final Set pendingSubscribeTopics = new HashSet<>(); + @Getter(AccessLevel.PACKAGE) private final HashMultimap handlerToSubscription = HashMultimap.create(); private final AtomicInteger nextMessageId = new AtomicInteger(1); + @Getter private final MqttClientConfig clientConfig; + @Getter(AccessLevel.PACKAGE) private final MqttHandler defaultHandler; private final ReconnectStrategy reconnectStrategy; @@ -88,12 +100,15 @@ final class MqttClientImpl implements MqttClient { private volatile Channel channel; private volatile boolean disconnected = false; + @Getter private volatile boolean reconnect = false; private String host; private int port; @Getter + @Setter private MqttClientCallback callback; + @Getter private final ListeningExecutor handlerExecutor; private final static int DISCONNECT_FALLBACK_DELAY_SECS = 1; @@ -240,11 +255,6 @@ final class MqttClientImpl implements MqttClient { this.eventLoop = eventLoop; } - @Override - public ListeningExecutor getHandlerExecutor() { - return this.handlerExecutor; - } - /** * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler * @@ -446,16 +456,6 @@ final class MqttClientImpl implements MqttClient { return future; } - /** - * Retrieve the MqttClient configuration - * - * @return The {@link MqttClientConfig} instance we use - */ - @Override - public MqttClientConfig getClientConfig() { - return clientConfig; - } - @Override public void disconnect() { if (disconnected) { @@ -480,25 +480,15 @@ final class MqttClientImpl implements MqttClient { } } - @Override - public void setCallback(MqttClientCallback callback) { - this.callback = callback; - } - ///////////////////////////////////////////// PRIVATE API ///////////////////////////////////////////// - public boolean isReconnect() { - return reconnect; - } - public void onSuccessfulReconnect() { if (callback != null) { callback.onSuccessfulReconnect(); } } - ChannelFuture sendAndFlushPacket(Object message) { if (this.channel == null) { return null; @@ -576,7 +566,7 @@ final class MqttClientImpl implements MqttClient { } private void checkSubscriptions(String topic, Promise promise) { - if (!(this.subscriptions.containsKey(topic) && this.subscriptions.get(topic).size() != 0) && this.serverSubscriptions.contains(topic)) { + if (!(this.subscriptions.containsKey(topic) && !this.subscriptions.get(topic).isEmpty()) && this.serverSubscriptions.contains(topic)) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0); MqttMessageIdVariableHeader variableHeader = getNewMessageId(); MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic)); @@ -614,38 +604,6 @@ final class MqttClientImpl implements MqttClient { } } - ConcurrentMap getPendingSubscriptions() { - return pendingSubscriptions; - } - - HashMultimap getSubscriptions() { - return subscriptions; - } - - Set getPendingSubscribeTopics() { - return pendingSubscribeTopics; - } - - HashMultimap getHandlerToSubscription() { - return handlerToSubscription; - } - - Set getServerSubscriptions() { - return serverSubscriptions; - } - - ConcurrentMap getPendingServerUnsubscribes() { - return pendingServerUnsubscribes; - } - - ConcurrentMap getPendingPublishes() { - return pendingPublishes; - } - - ConcurrentMap getQos2PendingIncomingPublishes() { - return qos2PendingIncomingPublishes; - } - private class MqttChannelInitializer extends ChannelInitializer { private final Promise connectFuture; @@ -673,10 +631,7 @@ final class MqttClientImpl implements MqttClient { ch.pipeline().addLast("mqttPingHandler", new MqttPingHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds())); ch.pipeline().addLast("mqttHandler", new MqttChannelHandler(MqttClientImpl.this, connectFuture)); } - } - MqttHandler getDefaultHandler() { - return defaultHandler; } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttConnectResult.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttConnectResult.java index 67757d2a7a..909955d062 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttConnectResult.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttConnectResult.java @@ -17,14 +17,18 @@ package org.thingsboard.mqtt; import io.netty.channel.ChannelFuture; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import lombok.Getter; import lombok.ToString; @ToString @SuppressWarnings({"WeakerAccess", "unused"}) public final class MqttConnectResult { + @Getter private final boolean success; + @Getter private final MqttConnectReturnCode returnCode; + @Getter private final ChannelFuture closeFuture; MqttConnectResult(boolean success, MqttConnectReturnCode returnCode, ChannelFuture closeFuture) { @@ -33,16 +37,4 @@ public final class MqttConnectResult { this.closeFuture = closeFuture; } - public boolean isSuccess() { - return success; - } - - public MqttConnectReturnCode getReturnCode() { - return returnCode; - } - - public ChannelFuture getCloseFuture() { - return closeFuture; - } - } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java index 7ad93462ab..d5125757da 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java @@ -15,16 +15,23 @@ */ package org.thingsboard.mqtt; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; + import java.util.regex.Pattern; final class MqttSubscription { + @Getter(AccessLevel.PACKAGE) private final String topic; private final Pattern topicRegex; + @Getter private final MqttHandler handler; - + @Getter(AccessLevel.PACKAGE) private final boolean once; - + @Getter(AccessLevel.PACKAGE) + @Setter(AccessLevel.PACKAGE) private volatile boolean called; MqttSubscription(String topic, MqttHandler handler, boolean once) { @@ -40,22 +47,6 @@ final class MqttSubscription { this.topicRegex = Pattern.compile(topic.replace("+", "[^/]+").replace("#", ".+") + "$"); } - String getTopic() { - return topic; - } - - public MqttHandler getHandler() { - return handler; - } - - boolean isOnce() { - return once; - } - - boolean isCalled() { - return called; - } - boolean matches(String topic) { return this.topicRegex.matcher(topic).matches(); } @@ -78,7 +69,4 @@ final class MqttSubscription { return result; } - void setCalled(boolean called) { - this.called = called; - } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 694fed1bf8..87643ae46d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -45,7 +45,6 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import javax.net.ssl.SSLException; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -64,12 +63,10 @@ import java.util.concurrent.TimeoutException; ) public class TbMqttNode extends TbAbstractExternalNode { - private static final Charset UTF8 = StandardCharsets.UTF_8; - - private static final String ERROR = "error"; + private static final int MQTT_3_MAX_CLIENT_ID_LENGTH = 23; + private static final int MQTT_5_MAX_CLIENT_ID_LENGTH = 256; protected TbMqttNodeConfiguration mqttNodeConfiguration; - protected MqttClient mqttClient; @Override @@ -87,9 +84,9 @@ public class TbMqttNode extends TbAbstractExternalNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg); + String topic = TbNodeUtils.processPattern(mqttNodeConfiguration.getTopicPattern(), msg); var tbMsg = ackIfNeeded(ctx, msg); - this.mqttClient.publish(topic, Unpooled.wrappedBuffer(getData(tbMsg, mqttNodeConfiguration.isParseToPlainText()).getBytes(UTF8)), + this.mqttClient.publish(topic, Unpooled.wrappedBuffer(getData(tbMsg, mqttNodeConfiguration.isParseToPlainText()).getBytes(StandardCharsets.UTF_8)), MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage()) .addListener(future -> { if (future.isSuccess()) { @@ -103,7 +100,7 @@ public class TbMqttNode extends TbAbstractExternalNode { private TbMsg processException(TbMsg origMsg, Throwable e) { TbMsgMetaData metaData = origMsg.getMetaData().copy(); - metaData.putValue(ERROR, e.getClass() + ": " + e.getMessage()); + metaData.putValue("error", e.getClass() + ": " + e.getMessage()); return origMsg.transform() .metaData(metaData) .build(); @@ -111,8 +108,8 @@ public class TbMqttNode extends TbAbstractExternalNode { @Override public void destroy() { - if (this.mqttClient != null) { - this.mqttClient.disconnect(); + if (mqttClient != null) { + mqttClient.disconnect(); } } @@ -123,11 +120,11 @@ public class TbMqttNode extends TbAbstractExternalNode { protected MqttClient initClient(TbContext ctx) throws Exception { MqttClientConfig config = new MqttClientConfig(getSslContext()); config.setOwnerId(getOwnerId(ctx)); - if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) { + if (!StringUtils.isEmpty(mqttNodeConfiguration.getClientId())) { config.setClientId(getClientId(ctx)); } - config.setCleanSession(this.mqttNodeConfiguration.isCleanSession()); - config.setProtocolVersion(this.mqttNodeConfiguration.getProtocolVersion()); + config.setCleanSession(mqttNodeConfiguration.isCleanSession()); + config.setProtocolVersion(mqttNodeConfiguration.getProtocolVersion()); MqttClientSettings mqttClientSettings = ctx.getMqttClientSettings(); config.setRetransmissionConfig(new MqttClientConfig.RetransmissionConfig( @@ -139,32 +136,32 @@ public class TbMqttNode extends TbAbstractExternalNode { prepareMqttClientConfig(config); MqttClient client = getMqttClient(ctx, config); client.setEventLoop(ctx.getSharedEventLoop()); - Promise connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); + Promise connectFuture = client.connect(mqttNodeConfiguration.getHost(), mqttNodeConfiguration.getPort()); MqttConnectResult result; try { - result = connectFuture.get(this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS); + result = connectFuture.get(mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS); } catch (TimeoutException ex) { connectFuture.cancel(true); client.disconnect(); - String hostPort = this.mqttNodeConfiguration.getHost() + ":" + this.mqttNodeConfiguration.getPort(); + String hostPort = mqttNodeConfiguration.getHost() + ":" + mqttNodeConfiguration.getPort(); throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s.", hostPort)); } if (!result.isSuccess()) { connectFuture.cancel(true); client.disconnect(); - String hostPort = this.mqttNodeConfiguration.getHost() + ":" + this.mqttNodeConfiguration.getPort(); + String hostPort = mqttNodeConfiguration.getHost() + ":" + mqttNodeConfiguration.getPort(); throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s. Result code is: %s", hostPort, result.getReturnCode())); } return client; } private String getClientId(TbContext ctx) throws TbNodeException { - String clientId = this.mqttNodeConfiguration.isAppendClientIdSuffix() ? - this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : - this.mqttNodeConfiguration.getClientId(); - if (clientId.length() > 23) { - throw new TbNodeException("Client ID is too long '" + clientId + "'. " + - "The length of Client ID cannot be longer than 23, but current length is " + clientId.length() + ".", true); + String clientId = mqttNodeConfiguration.isAppendClientIdSuffix() ? + mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : + mqttNodeConfiguration.getClientId(); + int maxLength = mqttNodeConfiguration.getProtocolVersion() == MqttVersion.MQTT_3_1 ? MQTT_3_MAX_CLIENT_ID_LENGTH : MQTT_5_MAX_CLIENT_ID_LENGTH; + if (clientId.length() > maxLength) { + throw new TbNodeException("The length of Client ID cannot be longer than " + maxLength + ", but current length is " + clientId.length() + ".", true); } return clientId; } @@ -174,7 +171,7 @@ public class TbMqttNode extends TbAbstractExternalNode { } protected void prepareMqttClientConfig(MqttClientConfig config) { - ClientCredentials credentials = this.mqttNodeConfiguration.getCredentials(); + ClientCredentials credentials = mqttNodeConfiguration.getCredentials(); if (credentials.getType() == CredentialsType.BASIC) { BasicCredentials basicCredentials = (BasicCredentials) credentials; config.setUsername(basicCredentials.getUsername()); @@ -183,7 +180,7 @@ public class TbMqttNode extends TbAbstractExternalNode { } private SslContext getSslContext() throws SSLException { - return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null; + return mqttNodeConfiguration.isSsl() ? mqttNodeConfiguration.getCredentials().initSslContext() : null; } private String getData(TbMsg tbMsg, boolean parseToPlainText) { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java index 1e877b80dc..3725dafa5e 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java @@ -212,40 +212,46 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { assertThatNoException().isThrownBy(() -> mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)))); } - @Test - public void givenClientIdIsTooLong_whenInit_thenThrowsException() { - String invalidClientId = "vhfrbeb38ygwfwrgfwefgterhytjytj"; - mqttNodeConfig.setClientId(invalidClientId); + @ParameterizedTest + @MethodSource("provideInvalidClientIdScenarios") + public void givenInvalidClientId_whenInit_thenThrowsException(MqttVersion version, int maxLength, int repeat, String serviceId, boolean appendSuffix) { + String baseClientId = "x".repeat(repeat); + mqttNodeConfig.setClientId(baseClientId); + mqttNodeConfig.setAppendClientIdSuffix(appendSuffix); + mqttNodeConfig.setProtocolVersion(version); given(ctxMock.getTenantId()).willReturn(TENANT_ID); given(ctxMock.getSelf()).willReturn(new RuleNode(RULE_NODE_ID)); + String clientId = appendSuffix ? baseClientId + "_" + serviceId : baseClientId; + if (appendSuffix) { + given(ctxMock.getServiceId()).willReturn(serviceId); + } + + String expectedMessage = "Client ID is too long '" + clientId + "'. " + + "The length of Client ID cannot be longer than " + maxLength + ", but current length is " + clientId.length() + "."; + assertThatThrownBy(() -> mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)))) .isInstanceOf(TbNodeException.class) - .hasMessage("Client ID is too long '" + invalidClientId + "'. " + - "The length of Client ID cannot be longer than 23, but current length is " + invalidClientId.length() + ".") + .hasMessage(expectedMessage) .extracting(e -> ((TbNodeException) e).isUnrecoverable()) .isEqualTo(true); } - @Test - public void givenClientIdIsOkAndAppendClientIdSuffixIsTrue_whenInit_thenClientIdBecomesInvalidAndThrowsException() { - String validClientId = "fertjnhnjj4ge"; - mqttNodeConfig.setClientId("fertjnhnjj4ge"); - mqttNodeConfig.setAppendClientIdSuffix(true); + private static Stream provideInvalidClientIdScenarios() { + return Stream.of( + // MQTT_5, too long clientId + Arguments.of(MqttVersion.MQTT_5, 256, 257, null, false), - given(ctxMock.getTenantId()).willReturn(TENANT_ID); - given(ctxMock.getSelf()).willReturn(new RuleNode(RULE_NODE_ID)); - String serviceId = "test-service"; - given(ctxMock.getServiceId()).willReturn(serviceId); + // MQTT_5, base + suffix exceeds + Arguments.of(MqttVersion.MQTT_5, 256, 250, "test-service", true), - String resultedClientId = validClientId + "_" + serviceId; - assertThatThrownBy(() -> mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)))) - .isInstanceOf(TbNodeException.class) - .hasMessage("Client ID is too long '" + resultedClientId + "'. " + - "The length of Client ID cannot be longer than 23, but current length is " + resultedClientId.length() + ".") - .extracting(e -> ((TbNodeException) e).isUnrecoverable()) - .isEqualTo(true); + // MQTT_3_1, too long clientId + Arguments.of(MqttVersion.MQTT_3_1, 23, 24, null, false), + + // MQTT_3_1, base + suffix exceeds + Arguments.of(MqttVersion.MQTT_3_1, 23, 5, "verylongservicename", true) + ); } @Test From 233d883d4cf4592075ac95db0059891d2b41e508 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Tue, 15 Jul 2025 10:38:09 +0300 Subject: [PATCH 2/2] Test: givenInvalidClientId_whenInit_thenThrowsException --- .../java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java index 3725dafa5e..5e93f6910f 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java @@ -228,8 +228,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { given(ctxMock.getServiceId()).willReturn(serviceId); } - String expectedMessage = "Client ID is too long '" + clientId + "'. " + - "The length of Client ID cannot be longer than " + maxLength + ", but current length is " + clientId.length() + "."; + String expectedMessage = "The length of Client ID cannot be longer than " + maxLength + ", but current length is " + clientId.length() + "."; assertThatThrownBy(() -> mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)))) .isInstanceOf(TbNodeException.class)