diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 65393d6939..caa434e4da 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -54,6 +54,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Created by ashvayka on 19.01.17. @@ -69,7 +71,9 @@ public class GatewaySessionHandler { private final TransportService transportService; private final DeviceInfoProto gateway; private final UUID sessionId; - private final Map devices; + private final Lock deviceCreationLock = new ReentrantLock(); + private final ConcurrentMap devices; + private final ConcurrentMap> deviceFutures; private final ConcurrentMap mqttQoSMap; private final ChannelHandlerContext channel; private final DeviceSessionCtx deviceSessionCtx; @@ -81,6 +85,7 @@ public class GatewaySessionHandler { this.gateway = deviceSessionCtx.getDeviceInfo(); this.sessionId = sessionId; this.devices = new ConcurrentHashMap<>(); + this.deviceFutures = new ConcurrentHashMap<>(); this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap(); this.channel = deviceSessionCtx.getChannel(); } @@ -106,37 +111,70 @@ public class GatewaySessionHandler { } private ListenableFuture onDeviceConnect(String deviceName, String deviceType) { - SettableFuture future = SettableFuture.create(); GatewayDeviceSessionCtx result = devices.get(deviceName); if (result == null) { - transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder() - .setDeviceName(deviceName) - .setDeviceType(deviceType) - .setGatewayIdMSB(gateway.getDeviceIdMSB()) - .setGatewayIdLSB(gateway.getDeviceIdLSB()).build(), - new TransportServiceCallback() { - @Override - public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) { - GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); - if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { - SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); - transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); - transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); - transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); - transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); - } - future.set(devices.get(deviceName)); - } - - @Override - public void onError(Throwable e) { - log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e); - future.setException(e); - } - }); + deviceCreationLock.lock(); + try { + result = devices.get(deviceName); + if (result == null) { + return getDeviceCreationFuture(deviceName, deviceType); + } else { + return toCompletedFuture(result); + } + } finally { + deviceCreationLock.unlock(); + } } else { - future.set(result); + return toCompletedFuture(result); } + } + + private ListenableFuture getDeviceCreationFuture(String deviceName, String deviceType) { + SettableFuture future = deviceFutures.get(deviceName); + if (future == null) { + final SettableFuture futureToSet = SettableFuture.create(); + deviceFutures.put(deviceName, futureToSet); + try { + transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder() + .setDeviceName(deviceName) + .setDeviceType(deviceType) + .setGatewayIdMSB(gateway.getDeviceIdMSB()) + .setGatewayIdLSB(gateway.getDeviceIdLSB()).build(), + new TransportServiceCallback() { + @Override + public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) { + GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); + if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { + SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); + transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); + transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); + transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); + transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); + } + futureToSet.set(devices.get(deviceName)); + deviceFutures.remove(deviceName); + } + + @Override + public void onError(Throwable e) { + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e); + futureToSet.setException(e); + deviceFutures.remove(deviceName); + } + }); + return futureToSet; + } catch (Throwable e) { + deviceFutures.remove(deviceName); + throw e; + } + } else { + return future; + } + } + + private ListenableFuture toCompletedFuture(GatewayDeviceSessionCtx result) { + SettableFuture future = SettableFuture.create(); + future.set(result); return future; }