ReconnectStrategyExponential jitter, max added, refactored, tested
This commit is contained in:
parent
264775ddb4
commit
83790fa0fb
@ -18,37 +18,61 @@ package org.thingsboard.mqtt;
|
|||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class ReconnectStrategyExponential implements ReconnectStrategy {
|
public class ReconnectStrategyExponential implements ReconnectStrategy {
|
||||||
|
|
||||||
public static final int DEFAULT_RECONNECT_INTERVAL = 10;
|
public static final int DEFAULT_RECONNECT_INTERVAL_SEC = 10;
|
||||||
final long reconnectIntervalMinSeconds;
|
public static final int MAX_RECONNECT_INTERVAL_SEC = 60;
|
||||||
final long reconnectIntervalMaxSeconds = 30;
|
public static final int EXP_MAX = 8;
|
||||||
long lastDisconnectNanoTime = 0; //isotonic time
|
public static final long JITTER_MAX = 1;
|
||||||
int retryCount = 0;
|
private final long reconnectIntervalMinSeconds;
|
||||||
|
private final long reconnectIntervalMaxSeconds;
|
||||||
|
private long lastDisconnectNanoTime = 0; //isotonic time
|
||||||
|
private long retryCount = 0;
|
||||||
|
|
||||||
public ReconnectStrategyExponential(long reconnectIntervalMin) {
|
public ReconnectStrategyExponential(long reconnectIntervalMinSeconds) {
|
||||||
this.reconnectIntervalMinSeconds = calculateIntervalMin(reconnectIntervalMin);
|
this.reconnectIntervalMaxSeconds = calculateIntervalMax(reconnectIntervalMinSeconds);
|
||||||
|
this.reconnectIntervalMinSeconds = calculateIntervalMin(reconnectIntervalMinSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long calculateIntervalMin(long reconnectIntervalMin) {
|
long calculateIntervalMax(long reconnectIntervalMinSeconds) {
|
||||||
return Math.min((reconnectIntervalMin > 0 ? reconnectIntervalMin : DEFAULT_RECONNECT_INTERVAL), this.reconnectIntervalMaxSeconds);
|
return reconnectIntervalMinSeconds > MAX_RECONNECT_INTERVAL_SEC ? reconnectIntervalMinSeconds : MAX_RECONNECT_INTERVAL_SEC;
|
||||||
|
}
|
||||||
|
|
||||||
|
long calculateIntervalMin(long reconnectIntervalMinSeconds) {
|
||||||
|
return Math.min((reconnectIntervalMinSeconds > 0 ? reconnectIntervalMinSeconds : DEFAULT_RECONNECT_INTERVAL_SEC), this.reconnectIntervalMaxSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
synchronized public long getNextReconnectDelay() {
|
synchronized public long getNextReconnectDelay() {
|
||||||
final long currentNanoTime = getNanoTime();
|
final long currentNanoTime = getNanoTime();
|
||||||
final long lastDisconnectIntervalNanos = currentNanoTime - lastDisconnectNanoTime;
|
final long coolDownSpentNanos = currentNanoTime - lastDisconnectNanoTime;
|
||||||
lastDisconnectNanoTime = currentNanoTime;
|
lastDisconnectNanoTime = currentNanoTime;
|
||||||
if (TimeUnit.NANOSECONDS.toSeconds(lastDisconnectIntervalNanos) > reconnectIntervalMaxSeconds + reconnectIntervalMinSeconds) {
|
if (isCooledDown(coolDownSpentNanos)) {
|
||||||
log.debug("Reset retry counter");
|
|
||||||
retryCount = 0;
|
retryCount = 0;
|
||||||
return reconnectIntervalMinSeconds;
|
return reconnectIntervalMinSeconds;
|
||||||
}
|
}
|
||||||
return Math.min(reconnectIntervalMaxSeconds, reconnectIntervalMinSeconds + (1L << retryCount++));
|
return calculateNextReconnectDelay() + calculateJitter();
|
||||||
|
}
|
||||||
|
|
||||||
|
long calculateJitter() {
|
||||||
|
return ThreadLocalRandom.current().nextInt() >= 0 ? JITTER_MAX : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
long calculateNextReconnectDelay() {
|
||||||
|
return Math.min(reconnectIntervalMaxSeconds, reconnectIntervalMinSeconds + calculateExp(retryCount++));
|
||||||
|
}
|
||||||
|
|
||||||
|
long calculateExp(long e) {
|
||||||
|
return 1L << Math.min(e, EXP_MAX);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isCooledDown(long coolDownSpentNanos) {
|
||||||
|
return TimeUnit.NANOSECONDS.toSeconds(coolDownSpentNanos) > reconnectIntervalMaxSeconds + reconnectIntervalMinSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
long getNanoTime() {
|
long getNanoTime() {
|
||||||
|
|||||||
@ -16,28 +16,80 @@
|
|||||||
package org.thingsboard.mqtt;
|
package org.thingsboard.mqtt;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.parallel.Execution;
|
||||||
import org.mockito.BDDMockito;
|
import org.junit.jupiter.api.parallel.ExecutionMode;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.data.Offset.offset;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
|
import static org.mockito.BDDMockito.willAnswer;
|
||||||
|
import static org.thingsboard.mqtt.ReconnectStrategyExponential.EXP_MAX;
|
||||||
|
import static org.thingsboard.mqtt.ReconnectStrategyExponential.JITTER_MAX;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
class ReconnectStrategyExponentialTest {
|
class ReconnectStrategyExponentialTest {
|
||||||
|
|
||||||
@Test
|
@Execution(ExecutionMode.SAME_THREAD) // just for convenient log reading
|
||||||
public void exponentialReconnect() {
|
@ParameterizedTest
|
||||||
ReconnectStrategyExponential strategy = Mockito.spy(new ReconnectStrategyExponential(1));
|
@ValueSource(ints = {1, 0, 60})
|
||||||
for (int i = 0; i < 10; i++) {
|
public void exponentialReconnectDelayTest(final int reconnectIntervalMinSeconds) {
|
||||||
log.info("Disconnect [{}] Delay [{}]", i, strategy.getNextReconnectDelay());
|
final ReconnectStrategyExponential strategy = Mockito.spy(new ReconnectStrategyExponential(reconnectIntervalMinSeconds));
|
||||||
}
|
log.info("=== Reconnect delay test for ReconnectStrategyExponential({}) : calculated min [{}] max [{}] ===", reconnectIntervalMinSeconds, strategy.getReconnectIntervalMinSeconds(), strategy.getReconnectIntervalMaxSeconds());
|
||||||
|
final AtomicLong nanoTime = new AtomicLong(System.nanoTime());
|
||||||
|
willAnswer((x) -> nanoTime.get()).given(strategy).getNanoTime();
|
||||||
|
final LinkedBlockingDeque<Long> jittersCaptured = new LinkedBlockingDeque<>();
|
||||||
|
final LinkedBlockingDeque<Long> expCaptured = new LinkedBlockingDeque<>();
|
||||||
|
|
||||||
final long coolDownPeriod = strategy.getReconnectIntervalMinSeconds() + strategy.getReconnectIntervalMaxSeconds() + 1;
|
willAnswer(captureResult(jittersCaptured)).given(strategy).calculateJitter();
|
||||||
|
willAnswer(captureResult(expCaptured)).given(strategy).calculateExp(anyLong());
|
||||||
|
|
||||||
BDDMockito.willAnswer((x) -> System.nanoTime() + TimeUnit.SECONDS.toNanos(coolDownPeriod)).given(strategy).getNanoTime();
|
for (int phase = 0; phase < 3; phase++) {
|
||||||
log.info("After cooldown period [{}] seconds later...", coolDownPeriod);
|
log.info("== Phase {} ==", phase);
|
||||||
for (int i = 0; i < 10; i++) {
|
long previousDelay = 0;
|
||||||
log.info("Disconnect [{}] Delay [{}]", i, strategy.getNextReconnectDelay());
|
for (int i = 0; i < EXP_MAX + 4; i++) {
|
||||||
|
final long nextReconnectDelay = strategy.getNextReconnectDelay();
|
||||||
|
nanoTime.addAndGet(TimeUnit.SECONDS.toNanos(nextReconnectDelay));
|
||||||
|
log.info("Retry [{}] Delay [{}] : min [{}] exp [{}] jitter [{}]", strategy.getRetryCount(), nextReconnectDelay, strategy.getReconnectIntervalMinSeconds(), expCaptured.peekLast(), jittersCaptured.peekLast());
|
||||||
|
assertThat(previousDelay).satisfiesAnyOf(
|
||||||
|
v -> assertThat(v).isLessThanOrEqualTo(nextReconnectDelay),
|
||||||
|
v -> assertThat(v).isCloseTo(nextReconnectDelay, offset(JITTER_MAX)) // Adjust tolerance as needed
|
||||||
|
);
|
||||||
|
previousDelay = nextReconnectDelay;
|
||||||
|
}
|
||||||
|
log.info("Jitters captured: {}", drainAll(jittersCaptured));
|
||||||
|
log.info("Exponents captured: {}", drainAll(expCaptured));
|
||||||
|
assertThat(previousDelay).isCloseTo(strategy.getReconnectIntervalMaxSeconds(), offset(JITTER_MAX));
|
||||||
|
|
||||||
|
final long coolDownPeriodSec = strategy.getReconnectIntervalMinSeconds() + strategy.getReconnectIntervalMaxSeconds() + 1;
|
||||||
|
log.info("Cooling down for [{}] seconds ...", coolDownPeriodSec);
|
||||||
|
nanoTime.addAndGet(TimeUnit.SECONDS.toNanos(coolDownPeriodSec));
|
||||||
|
assertThat(strategy.isCooledDown(TimeUnit.SECONDS.toNanos(coolDownPeriodSec))).as("cooled down").isTrue();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Answer<Long> captureResult(Collection<Long> collection) {
|
||||||
|
return invocation -> {
|
||||||
|
long result = (long) invocation.callRealMethod();
|
||||||
|
collection.add(result);
|
||||||
|
return result;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private Collection<Long> drainAll(BlockingQueue<Long> jittersCaptured) {
|
||||||
|
Collection<Long> elements = new ArrayList<>();
|
||||||
|
jittersCaptured.drainTo(elements);
|
||||||
|
return elements;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user