MQTT client reconnect strategy exponential
This commit is contained in:
		
							parent
							
								
									8615dd4ce5
								
							
						
					
					
						commit
						264775ddb4
					
				@ -80,6 +80,8 @@ final class MqttClientImpl implements MqttClient {
 | 
			
		||||
 | 
			
		||||
    private final MqttHandler defaultHandler;
 | 
			
		||||
 | 
			
		||||
    private final ReconnectStrategy reconnectStrategy;
 | 
			
		||||
 | 
			
		||||
    private EventLoopGroup eventLoop;
 | 
			
		||||
 | 
			
		||||
    private volatile Channel channel;
 | 
			
		||||
@ -110,6 +112,7 @@ final class MqttClientImpl implements MqttClient {
 | 
			
		||||
        this.clientConfig = clientConfig;
 | 
			
		||||
        this.defaultHandler = defaultHandler;
 | 
			
		||||
        this.handlerExecutor = handlerExecutor;
 | 
			
		||||
        this.reconnectStrategy = new ReconnectStrategyExponential(getClientConfig().getReconnectDelay());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
@ -191,7 +194,7 @@ final class MqttClientImpl implements MqttClient {
 | 
			
		||||
            if (reconnect) {
 | 
			
		||||
                this.reconnect = true;
 | 
			
		||||
            }
 | 
			
		||||
            eventLoop.schedule((Runnable) () -> connect(host, port, reconnect), clientConfig.getReconnectDelay(), TimeUnit.SECONDS);
 | 
			
		||||
            eventLoop.schedule((Runnable) () -> connect(host, port, reconnect), reconnectStrategy.getNextReconnectDelay(), TimeUnit.SECONDS);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,21 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2025 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.mqtt;
 | 
			
		||||
 | 
			
		||||
@FunctionalInterface
 | 
			
		||||
public interface ReconnectStrategy {
 | 
			
		||||
    long getNextReconnectDelay();
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,58 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2025 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.mqtt;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
@Getter
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class ReconnectStrategyExponential implements ReconnectStrategy {
 | 
			
		||||
 | 
			
		||||
    public static final int DEFAULT_RECONNECT_INTERVAL = 10;
 | 
			
		||||
    final long reconnectIntervalMinSeconds;
 | 
			
		||||
    final long reconnectIntervalMaxSeconds = 30;
 | 
			
		||||
    long lastDisconnectNanoTime = 0; //isotonic time
 | 
			
		||||
    int retryCount = 0;
 | 
			
		||||
 | 
			
		||||
    public ReconnectStrategyExponential(long reconnectIntervalMin) {
 | 
			
		||||
        this.reconnectIntervalMinSeconds = calculateIntervalMin(reconnectIntervalMin);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private long calculateIntervalMin(long reconnectIntervalMin) {
 | 
			
		||||
        return Math.min((reconnectIntervalMin > 0 ? reconnectIntervalMin : DEFAULT_RECONNECT_INTERVAL), this.reconnectIntervalMaxSeconds);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    synchronized public long getNextReconnectDelay() {
 | 
			
		||||
        final long currentNanoTime = getNanoTime();
 | 
			
		||||
        final long lastDisconnectIntervalNanos = currentNanoTime - lastDisconnectNanoTime;
 | 
			
		||||
        lastDisconnectNanoTime = currentNanoTime;
 | 
			
		||||
        if (TimeUnit.NANOSECONDS.toSeconds(lastDisconnectIntervalNanos) > reconnectIntervalMaxSeconds + reconnectIntervalMinSeconds) {
 | 
			
		||||
            log.debug("Reset retry counter");
 | 
			
		||||
            retryCount = 0;
 | 
			
		||||
            return reconnectIntervalMinSeconds;
 | 
			
		||||
        }
 | 
			
		||||
        return Math.min(reconnectIntervalMaxSeconds, reconnectIntervalMinSeconds + (1L << retryCount++));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    long getNanoTime() {
 | 
			
		||||
        return System.nanoTime();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,43 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2025 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.mqtt;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.mockito.BDDMockito;
 | 
			
		||||
import org.mockito.Mockito;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
class ReconnectStrategyExponentialTest {
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void exponentialReconnect() {
 | 
			
		||||
        ReconnectStrategyExponential strategy = Mockito.spy(new ReconnectStrategyExponential(1));
 | 
			
		||||
        for (int i = 0; i < 10; i++) {
 | 
			
		||||
            log.info("Disconnect [{}] Delay [{}]", i, strategy.getNextReconnectDelay());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        final long coolDownPeriod = strategy.getReconnectIntervalMinSeconds() + strategy.getReconnectIntervalMaxSeconds() + 1;
 | 
			
		||||
 | 
			
		||||
        BDDMockito.willAnswer((x) -> System.nanoTime() + TimeUnit.SECONDS.toNanos(coolDownPeriod)).given(strategy).getNanoTime();
 | 
			
		||||
        log.info("After cooldown period [{}] seconds later...", coolDownPeriod);
 | 
			
		||||
        for (int i = 0; i < 10; i++) {
 | 
			
		||||
            log.info("Disconnect [{}] Delay [{}]", i, strategy.getNextReconnectDelay());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user