From 61e1ce44aa4bce2aeb28d5fd1fdfc6f811f1fec6 Mon Sep 17 00:00:00 2001 From: vzikratyi Date: Fri, 17 Jul 2020 13:09:50 +0300 Subject: [PATCH] device creation lock upgrade --- .../server/transport/mqtt/session/GatewaySessionHandler.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index caa434e4da..fa81f584a0 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -71,7 +71,7 @@ public class GatewaySessionHandler { private final TransportService transportService; private final DeviceInfoProto gateway; private final UUID sessionId; - private final Lock deviceCreationLock = new ReentrantLock(); + private final ConcurrentMap deviceCreationLockMap; private final ConcurrentMap devices; private final ConcurrentMap> deviceFutures; private final ConcurrentMap mqttQoSMap; @@ -86,6 +86,7 @@ public class GatewaySessionHandler { this.sessionId = sessionId; this.devices = new ConcurrentHashMap<>(); this.deviceFutures = new ConcurrentHashMap<>(); + this.deviceCreationLockMap = new ConcurrentHashMap<>(); this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap(); this.channel = deviceSessionCtx.getChannel(); } @@ -113,6 +114,7 @@ public class GatewaySessionHandler { private ListenableFuture onDeviceConnect(String deviceName, String deviceType) { GatewayDeviceSessionCtx result = devices.get(deviceName); if (result == null) { + Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock()); deviceCreationLock.lock(); try { result = devices.get(deviceName); @@ -145,6 +147,7 @@ public class GatewaySessionHandler { public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) { GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { + log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);