From a38ee80e0957d52bc21f0a201b081edf4d3420f2 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Wed, 29 Apr 2020 10:02:00 +0300 Subject: [PATCH] Refactoring of device creation using gateway --- .../mqtt/session/GatewaySessionHandler.java | 104 +++++++++++------- 1 file changed, 62 insertions(+), 42 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 5dae247f1c..caa434e4da 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -54,6 +54,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Created by ashvayka on 19.01.17. @@ -69,6 +71,7 @@ public class GatewaySessionHandler { private final TransportService transportService; private final DeviceInfoProto gateway; private final UUID sessionId; + private final Lock deviceCreationLock = new ReentrantLock(); private final ConcurrentMap devices; private final ConcurrentMap> deviceFutures; private final ConcurrentMap mqttQoSMap; @@ -108,53 +111,70 @@ public class GatewaySessionHandler { } private ListenableFuture onDeviceConnect(String deviceName, String deviceType) { - SettableFuture future; GatewayDeviceSessionCtx result = devices.get(deviceName); if (result == null) { - synchronized (deviceFutures) { - future = deviceFutures.get(deviceName); - if (future == null) { - final SettableFuture futureToSet = SettableFuture.create(); - deviceFutures.put(deviceName, futureToSet); - future = futureToSet; - try { - transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder() - .setDeviceName(deviceName) - .setDeviceType(deviceType) - .setGatewayIdMSB(gateway.getDeviceIdMSB()) - .setGatewayIdLSB(gateway.getDeviceIdLSB()).build(), - new TransportServiceCallback() { - @Override - public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) { - GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); - if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { - SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); - transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); - transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); - transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); - transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); - } - futureToSet.set(devices.get(deviceName)); - deviceFutures.remove(deviceName); - } - - @Override - public void onError(Throwable e) { - log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e); - futureToSet.setException(e); - deviceFutures.remove(deviceName); - } - }); - } catch (Throwable e) { - deviceFutures.remove(deviceName); - throw e; - } + deviceCreationLock.lock(); + try { + result = devices.get(deviceName); + if (result == null) { + return getDeviceCreationFuture(deviceName, deviceType); + } else { + return toCompletedFuture(result); } + } finally { + deviceCreationLock.unlock(); } } else { - future = SettableFuture.create(); - future.set(result); + return toCompletedFuture(result); } + } + + private ListenableFuture getDeviceCreationFuture(String deviceName, String deviceType) { + SettableFuture future = deviceFutures.get(deviceName); + if (future == null) { + final SettableFuture futureToSet = SettableFuture.create(); + deviceFutures.put(deviceName, futureToSet); + try { + transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder() + .setDeviceName(deviceName) + .setDeviceType(deviceType) + .setGatewayIdMSB(gateway.getDeviceIdMSB()) + .setGatewayIdLSB(gateway.getDeviceIdLSB()).build(), + new TransportServiceCallback() { + @Override + public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) { + GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); + if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { + SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); + transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); + transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); + transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); + transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); + } + futureToSet.set(devices.get(deviceName)); + deviceFutures.remove(deviceName); + } + + @Override + public void onError(Throwable e) { + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e); + futureToSet.setException(e); + deviceFutures.remove(deviceName); + } + }); + return futureToSet; + } catch (Throwable e) { + deviceFutures.remove(deviceName); + throw e; + } + } else { + return future; + } + } + + private ListenableFuture toCompletedFuture(GatewayDeviceSessionCtx result) { + SettableFuture future = SettableFuture.create(); + future.set(result); return future; }