diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 193a7d24f5..dd89bef96a 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -34,6 +34,7 @@ import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; +import org.springframework.util.ConcurrentReferenceHashMap; import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.TransportService; @@ -66,6 +67,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType; + /** * Created by ashvayka on 19.01.17. */ @@ -82,7 +85,7 @@ public class GatewaySessionHandler { private final UUID sessionId; private final ConcurrentMap deviceCreationLockMap; private final ConcurrentMap devices; - private final ConcurrentMap> deviceFutures; + private final ConcurrentMap> deviceFutures; private final ConcurrentMap mqttQoSMap; private final ChannelHandlerContext channel; private final DeviceSessionCtx deviceSessionCtx; @@ -100,6 +103,10 @@ public class GatewaySessionHandler { this.channel = deviceSessionCtx.getChannel(); } + ConcurrentReferenceHashMap createWeakMap() { + return new ConcurrentReferenceHashMap<>(16, ReferenceType.WEAK); + } + public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException { if (isJsonPayloadType()) { onDeviceConnectJson(mqttMsg); @@ -228,21 +235,22 @@ public class GatewaySessionHandler { if (result == null) { return getDeviceCreationFuture(deviceName, deviceType); } else { - return toCompletedFuture(result); + return Futures.immediateFuture(result); } } finally { deviceCreationLock.unlock(); } } else { - return toCompletedFuture(result); + return Futures.immediateFuture(result); } } private ListenableFuture getDeviceCreationFuture(String deviceName, String deviceType) { - SettableFuture future = deviceFutures.get(deviceName); - if (future == null) { - final SettableFuture futureToSet = SettableFuture.create(); - deviceFutures.put(deviceName, futureToSet); + final SettableFuture futureToSet = SettableFuture.create(); + ListenableFuture future = deviceFutures.putIfAbsent(deviceName, futureToSet); + if (future != null) { + return future; + } try { transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder() .setDeviceName(deviceName) @@ -282,15 +290,6 @@ public class GatewaySessionHandler { deviceFutures.remove(deviceName); throw e; } - } else { - return future; - } - } - - private ListenableFuture toCompletedFuture(GatewayDeviceSessionCtx result) { - SettableFuture future = SettableFuture.create(); - future.set(result); - return future; } private int getMsgId(MqttPublishMessage mqttMsg) {