From 44a1fa2911b1f51830c97ea98e4c221a67c4e2ad Mon Sep 17 00:00:00 2001 From: Chi Joung So Date: Thu, 23 Aug 2018 10:02:28 +0200 Subject: [PATCH] ReconnectDelay and MaxBytesInMessage configurable --- .../thingsboard/mqtt/MqttClientConfig.java | 36 +++++++++++++++++++ .../org/thingsboard/mqtt/MqttClientImpl.java | 4 +-- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java index a59d83b14b..bcb8bd15ba 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java @@ -40,6 +40,8 @@ public final class MqttClientConfig { private Class channelClass = NioSocketChannel.class; private boolean reconnect = true; + private long reconnectDelay = 1L; + private int maxBytesInMessage = 8092; public MqttClientConfig() { this(null); @@ -146,4 +148,38 @@ public final class MqttClientConfig { public void setReconnect(boolean reconnect) { this.reconnect = reconnect; } + + public long getReconnectDelay() { + return reconnectDelay; + } + + /** + * Sets the reconnect delay in seconds. Defaults to 1 second. + * @param reconnectDelay + * @throws IllegalArgumentException if reconnectDelay is smaller than 1. + */ + public void setReconnectDelay(long reconnectDelay) { + if (reconnectDelay <= 0) { + throw new IllegalArgumentException("reconnectDelay must be > 0"); + } + this.reconnectDelay = reconnectDelay; + } + + public int getMaxBytesInMessage() { + return maxBytesInMessage; + } + + /** + * Sets the maximum number of bytes in the message for the {@link io.netty.handler.codec.mqtt.MqttDecoder}. + * Default value is 8092 as specified by Netty. The absolute maximum size is 256MB as set by the MQTT spec. + * + * @param maxBytesInMessage + * @throws IllegalArgumentException if maxBytesInMessage is smaller than 1 or greater than 256_000_000. + */ + public void setMaxBytesInMessage(int maxBytesInMessage) { + if (maxBytesInMessage <= 0 || maxBytesInMessage > 256_000_000) { + throw new IllegalArgumentException("maxBytesInMessage must be > 0 or < 256_000_000"); + } + this.maxBytesInMessage = maxBytesInMessage; + } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index a5df846899..b9460b3555 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -155,7 +155,7 @@ final class MqttClientImpl implements MqttClient { if (reconnect) { this.reconnect = true; } - eventLoop.schedule((Runnable) () -> connect(host, port, reconnect), 1L, TimeUnit.SECONDS); + eventLoop.schedule((Runnable) () -> connect(host, port, reconnect), clientConfig.getReconnectDelay(), TimeUnit.SECONDS); } } @@ -512,7 +512,7 @@ final class MqttClientImpl implements MqttClient { ch.pipeline().addLast(sslContext.newHandler(ch.alloc(), host, port)); } - ch.pipeline().addLast("mqttDecoder", new MqttDecoder()); + ch.pipeline().addLast("mqttDecoder", new MqttDecoder(clientConfig.getMaxBytesInMessage())); ch.pipeline().addLast("mqttEncoder", MqttEncoder.INSTANCE); ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds(), MqttClientImpl.this.clientConfig.getTimeoutSeconds(), 0)); ch.pipeline().addLast("mqttPingHandler", new MqttPingHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds()));