Merge branch 'master' of github.com:thingsboard/thingsboard into develop/3.0
This commit is contained in:
commit
d3cbfdce27
@ -54,6 +54,8 @@ import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 19.01.17.
|
||||
@ -69,7 +71,9 @@ public class GatewaySessionHandler {
|
||||
private final TransportService transportService;
|
||||
private final DeviceInfoProto gateway;
|
||||
private final UUID sessionId;
|
||||
private final Map<String, GatewayDeviceSessionCtx> devices;
|
||||
private final Lock deviceCreationLock = new ReentrantLock();
|
||||
private final ConcurrentMap<String, GatewayDeviceSessionCtx> devices;
|
||||
private final ConcurrentMap<String, SettableFuture<GatewayDeviceSessionCtx>> deviceFutures;
|
||||
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
|
||||
private final ChannelHandlerContext channel;
|
||||
private final DeviceSessionCtx deviceSessionCtx;
|
||||
@ -81,6 +85,7 @@ public class GatewaySessionHandler {
|
||||
this.gateway = deviceSessionCtx.getDeviceInfo();
|
||||
this.sessionId = sessionId;
|
||||
this.devices = new ConcurrentHashMap<>();
|
||||
this.deviceFutures = new ConcurrentHashMap<>();
|
||||
this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap();
|
||||
this.channel = deviceSessionCtx.getChannel();
|
||||
}
|
||||
@ -106,37 +111,70 @@ public class GatewaySessionHandler {
|
||||
}
|
||||
|
||||
private ListenableFuture<GatewayDeviceSessionCtx> onDeviceConnect(String deviceName, String deviceType) {
|
||||
SettableFuture<GatewayDeviceSessionCtx> future = SettableFuture.create();
|
||||
GatewayDeviceSessionCtx result = devices.get(deviceName);
|
||||
if (result == null) {
|
||||
transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
|
||||
.setDeviceName(deviceName)
|
||||
.setDeviceType(deviceType)
|
||||
.setGatewayIdMSB(gateway.getDeviceIdMSB())
|
||||
.setGatewayIdLSB(gateway.getDeviceIdLSB()).build(),
|
||||
new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg>() {
|
||||
@Override
|
||||
public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) {
|
||||
GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);
|
||||
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
|
||||
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
|
||||
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
|
||||
transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
|
||||
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
|
||||
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
|
||||
}
|
||||
future.set(devices.get(deviceName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable e) {
|
||||
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
|
||||
future.setException(e);
|
||||
}
|
||||
});
|
||||
deviceCreationLock.lock();
|
||||
try {
|
||||
result = devices.get(deviceName);
|
||||
if (result == null) {
|
||||
return getDeviceCreationFuture(deviceName, deviceType);
|
||||
} else {
|
||||
return toCompletedFuture(result);
|
||||
}
|
||||
} finally {
|
||||
deviceCreationLock.unlock();
|
||||
}
|
||||
} else {
|
||||
future.set(result);
|
||||
return toCompletedFuture(result);
|
||||
}
|
||||
}
|
||||
|
||||
private ListenableFuture<GatewayDeviceSessionCtx> getDeviceCreationFuture(String deviceName, String deviceType) {
|
||||
SettableFuture<GatewayDeviceSessionCtx> future = deviceFutures.get(deviceName);
|
||||
if (future == null) {
|
||||
final SettableFuture<GatewayDeviceSessionCtx> futureToSet = SettableFuture.create();
|
||||
deviceFutures.put(deviceName, futureToSet);
|
||||
try {
|
||||
transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
|
||||
.setDeviceName(deviceName)
|
||||
.setDeviceType(deviceType)
|
||||
.setGatewayIdMSB(gateway.getDeviceIdMSB())
|
||||
.setGatewayIdLSB(gateway.getDeviceIdLSB()).build(),
|
||||
new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg>() {
|
||||
@Override
|
||||
public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) {
|
||||
GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);
|
||||
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
|
||||
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
|
||||
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
|
||||
transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
|
||||
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
|
||||
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
|
||||
}
|
||||
futureToSet.set(devices.get(deviceName));
|
||||
deviceFutures.remove(deviceName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable e) {
|
||||
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
|
||||
futureToSet.setException(e);
|
||||
deviceFutures.remove(deviceName);
|
||||
}
|
||||
});
|
||||
return futureToSet;
|
||||
} catch (Throwable e) {
|
||||
deviceFutures.remove(deviceName);
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
||||
private ListenableFuture<GatewayDeviceSessionCtx> toCompletedFuture(GatewayDeviceSessionCtx result) {
|
||||
SettableFuture<GatewayDeviceSessionCtx> future = SettableFuture.create();
|
||||
future.set(result);
|
||||
return future;
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user