From 83790fa0fb6a208ca872a7c9cb704d2e8dab80e6 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 20 Mar 2025 11:02:52 +0100 Subject: [PATCH] ReconnectStrategyExponential jitter, max added, refactored, tested --- .../mqtt/ReconnectStrategyExponential.java | 50 +++++++++--- .../ReconnectStrategyExponentialTest.java | 80 +++++++++++++++---- 2 files changed, 103 insertions(+), 27 deletions(-) diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/ReconnectStrategyExponential.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/ReconnectStrategyExponential.java index dbd11f91e1..e0b83c6ab7 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/ReconnectStrategyExponential.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/ReconnectStrategyExponential.java @@ -18,37 +18,61 @@ package org.thingsboard.mqtt; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @Getter @Slf4j public class ReconnectStrategyExponential implements ReconnectStrategy { - public static final int DEFAULT_RECONNECT_INTERVAL = 10; - final long reconnectIntervalMinSeconds; - final long reconnectIntervalMaxSeconds = 30; - long lastDisconnectNanoTime = 0; //isotonic time - int retryCount = 0; + public static final int DEFAULT_RECONNECT_INTERVAL_SEC = 10; + public static final int MAX_RECONNECT_INTERVAL_SEC = 60; + public static final int EXP_MAX = 8; + public static final long JITTER_MAX = 1; + private final long reconnectIntervalMinSeconds; + private final long reconnectIntervalMaxSeconds; + private long lastDisconnectNanoTime = 0; //isotonic time + private long retryCount = 0; - public ReconnectStrategyExponential(long reconnectIntervalMin) { - this.reconnectIntervalMinSeconds = calculateIntervalMin(reconnectIntervalMin); + public ReconnectStrategyExponential(long reconnectIntervalMinSeconds) { + this.reconnectIntervalMaxSeconds = calculateIntervalMax(reconnectIntervalMinSeconds); + this.reconnectIntervalMinSeconds = calculateIntervalMin(reconnectIntervalMinSeconds); } - private long calculateIntervalMin(long reconnectIntervalMin) { - return Math.min((reconnectIntervalMin > 0 ? reconnectIntervalMin : DEFAULT_RECONNECT_INTERVAL), this.reconnectIntervalMaxSeconds); + long calculateIntervalMax(long reconnectIntervalMinSeconds) { + return reconnectIntervalMinSeconds > MAX_RECONNECT_INTERVAL_SEC ? reconnectIntervalMinSeconds : MAX_RECONNECT_INTERVAL_SEC; + } + + long calculateIntervalMin(long reconnectIntervalMinSeconds) { + return Math.min((reconnectIntervalMinSeconds > 0 ? reconnectIntervalMinSeconds : DEFAULT_RECONNECT_INTERVAL_SEC), this.reconnectIntervalMaxSeconds); } @Override synchronized public long getNextReconnectDelay() { final long currentNanoTime = getNanoTime(); - final long lastDisconnectIntervalNanos = currentNanoTime - lastDisconnectNanoTime; + final long coolDownSpentNanos = currentNanoTime - lastDisconnectNanoTime; lastDisconnectNanoTime = currentNanoTime; - if (TimeUnit.NANOSECONDS.toSeconds(lastDisconnectIntervalNanos) > reconnectIntervalMaxSeconds + reconnectIntervalMinSeconds) { - log.debug("Reset retry counter"); + if (isCooledDown(coolDownSpentNanos)) { retryCount = 0; return reconnectIntervalMinSeconds; } - return Math.min(reconnectIntervalMaxSeconds, reconnectIntervalMinSeconds + (1L << retryCount++)); + return calculateNextReconnectDelay() + calculateJitter(); + } + + long calculateJitter() { + return ThreadLocalRandom.current().nextInt() >= 0 ? JITTER_MAX : 0; + } + + long calculateNextReconnectDelay() { + return Math.min(reconnectIntervalMaxSeconds, reconnectIntervalMinSeconds + calculateExp(retryCount++)); + } + + long calculateExp(long e) { + return 1L << Math.min(e, EXP_MAX); + } + + boolean isCooledDown(long coolDownSpentNanos) { + return TimeUnit.NANOSECONDS.toSeconds(coolDownSpentNanos) > reconnectIntervalMaxSeconds + reconnectIntervalMinSeconds; } long getNanoTime() { diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/ReconnectStrategyExponentialTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/ReconnectStrategyExponentialTest.java index 828cfae730..085dcef3a6 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/ReconnectStrategyExponentialTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/ReconnectStrategyExponentialTest.java @@ -16,28 +16,80 @@ package org.thingsboard.mqtt; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Test; -import org.mockito.BDDMockito; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.data.Offset.offset; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.BDDMockito.willAnswer; +import static org.thingsboard.mqtt.ReconnectStrategyExponential.EXP_MAX; +import static org.thingsboard.mqtt.ReconnectStrategyExponential.JITTER_MAX; @Slf4j class ReconnectStrategyExponentialTest { - @Test - public void exponentialReconnect() { - ReconnectStrategyExponential strategy = Mockito.spy(new ReconnectStrategyExponential(1)); - for (int i = 0; i < 10; i++) { - log.info("Disconnect [{}] Delay [{}]", i, strategy.getNextReconnectDelay()); - } + @Execution(ExecutionMode.SAME_THREAD) // just for convenient log reading + @ParameterizedTest + @ValueSource(ints = {1, 0, 60}) + public void exponentialReconnectDelayTest(final int reconnectIntervalMinSeconds) { + final ReconnectStrategyExponential strategy = Mockito.spy(new ReconnectStrategyExponential(reconnectIntervalMinSeconds)); + log.info("=== Reconnect delay test for ReconnectStrategyExponential({}) : calculated min [{}] max [{}] ===", reconnectIntervalMinSeconds, strategy.getReconnectIntervalMinSeconds(), strategy.getReconnectIntervalMaxSeconds()); + final AtomicLong nanoTime = new AtomicLong(System.nanoTime()); + willAnswer((x) -> nanoTime.get()).given(strategy).getNanoTime(); + final LinkedBlockingDeque jittersCaptured = new LinkedBlockingDeque<>(); + final LinkedBlockingDeque expCaptured = new LinkedBlockingDeque<>(); - final long coolDownPeriod = strategy.getReconnectIntervalMinSeconds() + strategy.getReconnectIntervalMaxSeconds() + 1; + willAnswer(captureResult(jittersCaptured)).given(strategy).calculateJitter(); + willAnswer(captureResult(expCaptured)).given(strategy).calculateExp(anyLong()); - BDDMockito.willAnswer((x) -> System.nanoTime() + TimeUnit.SECONDS.toNanos(coolDownPeriod)).given(strategy).getNanoTime(); - log.info("After cooldown period [{}] seconds later...", coolDownPeriod); - for (int i = 0; i < 10; i++) { - log.info("Disconnect [{}] Delay [{}]", i, strategy.getNextReconnectDelay()); + for (int phase = 0; phase < 3; phase++) { + log.info("== Phase {} ==", phase); + long previousDelay = 0; + for (int i = 0; i < EXP_MAX + 4; i++) { + final long nextReconnectDelay = strategy.getNextReconnectDelay(); + nanoTime.addAndGet(TimeUnit.SECONDS.toNanos(nextReconnectDelay)); + log.info("Retry [{}] Delay [{}] : min [{}] exp [{}] jitter [{}]", strategy.getRetryCount(), nextReconnectDelay, strategy.getReconnectIntervalMinSeconds(), expCaptured.peekLast(), jittersCaptured.peekLast()); + assertThat(previousDelay).satisfiesAnyOf( + v -> assertThat(v).isLessThanOrEqualTo(nextReconnectDelay), + v -> assertThat(v).isCloseTo(nextReconnectDelay, offset(JITTER_MAX)) // Adjust tolerance as needed + ); + previousDelay = nextReconnectDelay; + } + log.info("Jitters captured: {}", drainAll(jittersCaptured)); + log.info("Exponents captured: {}", drainAll(expCaptured)); + assertThat(previousDelay).isCloseTo(strategy.getReconnectIntervalMaxSeconds(), offset(JITTER_MAX)); + + final long coolDownPeriodSec = strategy.getReconnectIntervalMinSeconds() + strategy.getReconnectIntervalMaxSeconds() + 1; + log.info("Cooling down for [{}] seconds ...", coolDownPeriodSec); + nanoTime.addAndGet(TimeUnit.SECONDS.toNanos(coolDownPeriodSec)); + assertThat(strategy.isCooledDown(TimeUnit.SECONDS.toNanos(coolDownPeriodSec))).as("cooled down").isTrue(); } } -} \ No newline at end of file + + private Answer captureResult(Collection collection) { + return invocation -> { + long result = (long) invocation.callRealMethod(); + collection.add(result); + return result; + }; + } + + private Collection drainAll(BlockingQueue jittersCaptured) { + Collection elements = new ArrayList<>(); + jittersCaptured.drainTo(elements); + return elements; + } + +}