diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index caa434e4da..fa81f584a0 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -71,7 +71,7 @@ public class GatewaySessionHandler { private final TransportService transportService; private final DeviceInfoProto gateway; private final UUID sessionId; - private final Lock deviceCreationLock = new ReentrantLock(); + private final ConcurrentMap deviceCreationLockMap; private final ConcurrentMap devices; private final ConcurrentMap> deviceFutures; private final ConcurrentMap mqttQoSMap; @@ -86,6 +86,7 @@ public class GatewaySessionHandler { this.sessionId = sessionId; this.devices = new ConcurrentHashMap<>(); this.deviceFutures = new ConcurrentHashMap<>(); + this.deviceCreationLockMap = new ConcurrentHashMap<>(); this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap(); this.channel = deviceSessionCtx.getChannel(); } @@ -113,6 +114,7 @@ public class GatewaySessionHandler { private ListenableFuture onDeviceConnect(String deviceName, String deviceType) { GatewayDeviceSessionCtx result = devices.get(deviceName); if (result == null) { + Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock()); deviceCreationLock.lock(); try { result = devices.get(deviceName); @@ -145,6 +147,7 @@ public class GatewaySessionHandler { public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) { GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { + log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);