diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 47eae565dc..344886165e 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -80,6 +80,8 @@ final class MqttClientImpl implements MqttClient { private final MqttHandler defaultHandler; + private final ReconnectStrategy reconnectStrategy; + private EventLoopGroup eventLoop; private volatile Channel channel; @@ -110,6 +112,7 @@ final class MqttClientImpl implements MqttClient { this.clientConfig = clientConfig; this.defaultHandler = defaultHandler; this.handlerExecutor = handlerExecutor; + this.reconnectStrategy = new ReconnectStrategyExponential(getClientConfig().getReconnectDelay()); } /** @@ -191,7 +194,7 @@ final class MqttClientImpl implements MqttClient { if (reconnect) { this.reconnect = true; } - eventLoop.schedule((Runnable) () -> connect(host, port, reconnect), clientConfig.getReconnectDelay(), TimeUnit.SECONDS); + eventLoop.schedule((Runnable) () -> connect(host, port, reconnect), reconnectStrategy.getNextReconnectDelay(), TimeUnit.SECONDS); } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/ReconnectStrategy.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/ReconnectStrategy.java new file mode 100644 index 0000000000..f924b79ca5 --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/ReconnectStrategy.java @@ -0,0 +1,21 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +@FunctionalInterface +public interface ReconnectStrategy { + long getNextReconnectDelay(); +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/ReconnectStrategyExponential.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/ReconnectStrategyExponential.java new file mode 100644 index 0000000000..dbd11f91e1 --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/ReconnectStrategyExponential.java @@ -0,0 +1,58 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; + +@Getter +@Slf4j +public class ReconnectStrategyExponential implements ReconnectStrategy { + + public static final int DEFAULT_RECONNECT_INTERVAL = 10; + final long reconnectIntervalMinSeconds; + final long reconnectIntervalMaxSeconds = 30; + long lastDisconnectNanoTime = 0; //isotonic time + int retryCount = 0; + + public ReconnectStrategyExponential(long reconnectIntervalMin) { + this.reconnectIntervalMinSeconds = calculateIntervalMin(reconnectIntervalMin); + } + + private long calculateIntervalMin(long reconnectIntervalMin) { + return Math.min((reconnectIntervalMin > 0 ? reconnectIntervalMin : DEFAULT_RECONNECT_INTERVAL), this.reconnectIntervalMaxSeconds); + } + + @Override + synchronized public long getNextReconnectDelay() { + final long currentNanoTime = getNanoTime(); + final long lastDisconnectIntervalNanos = currentNanoTime - lastDisconnectNanoTime; + lastDisconnectNanoTime = currentNanoTime; + if (TimeUnit.NANOSECONDS.toSeconds(lastDisconnectIntervalNanos) > reconnectIntervalMaxSeconds + reconnectIntervalMinSeconds) { + log.debug("Reset retry counter"); + retryCount = 0; + return reconnectIntervalMinSeconds; + } + return Math.min(reconnectIntervalMaxSeconds, reconnectIntervalMinSeconds + (1L << retryCount++)); + } + + long getNanoTime() { + return System.nanoTime(); + } + +} diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/ReconnectStrategyExponentialTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/ReconnectStrategyExponentialTest.java new file mode 100644 index 0000000000..828cfae730 --- /dev/null +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/ReconnectStrategyExponentialTest.java @@ -0,0 +1,43 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.mockito.BDDMockito; +import org.mockito.Mockito; + +import java.util.concurrent.TimeUnit; + +@Slf4j +class ReconnectStrategyExponentialTest { + + @Test + public void exponentialReconnect() { + ReconnectStrategyExponential strategy = Mockito.spy(new ReconnectStrategyExponential(1)); + for (int i = 0; i < 10; i++) { + log.info("Disconnect [{}] Delay [{}]", i, strategy.getNextReconnectDelay()); + } + + final long coolDownPeriod = strategy.getReconnectIntervalMinSeconds() + strategy.getReconnectIntervalMaxSeconds() + 1; + + BDDMockito.willAnswer((x) -> System.nanoTime() + TimeUnit.SECONDS.toNanos(coolDownPeriod)).given(strategy).getNanoTime(); + log.info("After cooldown period [{}] seconds later...", coolDownPeriod); + for (int i = 0; i < 10; i++) { + log.info("Disconnect [{}] Delay [{}]", i, strategy.getNextReconnectDelay()); + } + } +} \ No newline at end of file