gateway session - weak map for locks (auto cleanup), getDeviceCreationFuture refactored using concurrent putIfAbsent

This commit is contained in:
Sergey Matvienko 2021-07-30 17:27:24 +03:00 committed by Andrew Shvayka
parent c9c13457e1
commit 89e3ba253c

View File

@ -34,6 +34,7 @@ import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportService;
@ -66,6 +67,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType;
/** /**
* Created by ashvayka on 19.01.17. * Created by ashvayka on 19.01.17.
*/ */
@ -82,7 +85,7 @@ public class GatewaySessionHandler {
private final UUID sessionId; private final UUID sessionId;
private final ConcurrentMap<String, Lock> deviceCreationLockMap; private final ConcurrentMap<String, Lock> deviceCreationLockMap;
private final ConcurrentMap<String, GatewayDeviceSessionCtx> devices; private final ConcurrentMap<String, GatewayDeviceSessionCtx> devices;
private final ConcurrentMap<String, SettableFuture<GatewayDeviceSessionCtx>> deviceFutures; private final ConcurrentMap<String, ListenableFuture<GatewayDeviceSessionCtx>> deviceFutures;
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap; private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
private final ChannelHandlerContext channel; private final ChannelHandlerContext channel;
private final DeviceSessionCtx deviceSessionCtx; private final DeviceSessionCtx deviceSessionCtx;
@ -100,6 +103,10 @@ public class GatewaySessionHandler {
this.channel = deviceSessionCtx.getChannel(); this.channel = deviceSessionCtx.getChannel();
} }
ConcurrentReferenceHashMap<String, Lock> createWeakMap() {
return new ConcurrentReferenceHashMap<>(16, ReferenceType.WEAK);
}
public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException { public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException {
if (isJsonPayloadType()) { if (isJsonPayloadType()) {
onDeviceConnectJson(mqttMsg); onDeviceConnectJson(mqttMsg);
@ -228,21 +235,22 @@ public class GatewaySessionHandler {
if (result == null) { if (result == null) {
return getDeviceCreationFuture(deviceName, deviceType); return getDeviceCreationFuture(deviceName, deviceType);
} else { } else {
return toCompletedFuture(result); return Futures.immediateFuture(result);
} }
} finally { } finally {
deviceCreationLock.unlock(); deviceCreationLock.unlock();
} }
} else { } else {
return toCompletedFuture(result); return Futures.immediateFuture(result);
} }
} }
private ListenableFuture<GatewayDeviceSessionCtx> getDeviceCreationFuture(String deviceName, String deviceType) { private ListenableFuture<GatewayDeviceSessionCtx> getDeviceCreationFuture(String deviceName, String deviceType) {
SettableFuture<GatewayDeviceSessionCtx> future = deviceFutures.get(deviceName); final SettableFuture<GatewayDeviceSessionCtx> futureToSet = SettableFuture.create();
if (future == null) { ListenableFuture<GatewayDeviceSessionCtx> future = deviceFutures.putIfAbsent(deviceName, futureToSet);
final SettableFuture<GatewayDeviceSessionCtx> futureToSet = SettableFuture.create(); if (future != null) {
deviceFutures.put(deviceName, futureToSet); return future;
}
try { try {
transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder() transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
.setDeviceName(deviceName) .setDeviceName(deviceName)
@ -282,15 +290,6 @@ public class GatewaySessionHandler {
deviceFutures.remove(deviceName); deviceFutures.remove(deviceName);
throw e; throw e;
} }
} else {
return future;
}
}
private ListenableFuture<GatewayDeviceSessionCtx> toCompletedFuture(GatewayDeviceSessionCtx result) {
SettableFuture<GatewayDeviceSessionCtx> future = SettableFuture.create();
future.set(result);
return future;
} }
private int getMsgId(MqttPublishMessage mqttMsg) { private int getMsgId(MqttPublishMessage mqttMsg) {