From d3e16ad65009f7c80869dae628bf3aba47685071 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Tue, 22 Jun 2021 13:31:08 +0300 Subject: [PATCH] updated observe sessions closing actions --- .../transport/coap/CoapTransportResource.java | 123 +++++++++++------- .../transport/http/DeviceApiController.java | 6 +- .../lwm2m/server/LwM2mSessionMsgListener.java | 10 +- .../transport/mqtt/MqttTransportHandler.java | 5 +- .../mqtt/session/GatewayDeviceSessionCtx.java | 6 +- .../snmp/session/DeviceSessionContext.java | 6 +- .../common/transport/SessionMsgListener.java | 4 +- .../service/DefaultTransportService.java | 4 +- 8 files changed, 102 insertions(+), 62 deletions(-) diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 95cdeada88..9f5d78b5aa 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -64,6 +64,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; @Slf4j public class CoapTransportResource extends AbstractCoapTransportResource { @@ -75,9 +76,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private static final int REQUEST_ID_POSITION_CERTIFICATE_REQUEST = 4; private static final String DTLS_SESSION_ID_KEY = "DTLS_SESSION_ID"; - private final ConcurrentMap tokenToSessionInfoMap = new ConcurrentHashMap<>(); - private final ConcurrentMap tokenToObserveNotificationSeqMap = new ConcurrentHashMap<>(); - private final ConcurrentMap sessionInfoToObserveRelationMap = new ConcurrentHashMap<>(); + private final ConcurrentMap tokenToCoapSessionInfoMap = new ConcurrentHashMap<>(); + private final ConcurrentMap sessionInfoToObserveRelationMap = new ConcurrentHashMap<>(); private final Set rpcSubscriptions = ConcurrentHashMap.newKeySet(); private final Set attributeSubscriptions = ConcurrentHashMap.newKeySet(); @@ -93,7 +93,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource { this.timeout = coapServerService.getTimeout(); this.sessionReportTimeout = ctx.getSessionReportTimeout(); ctx.getScheduler().scheduleAtFixedRate(() -> { - Set observeSessions = sessionInfoToObserveRelationMap.keySet(); + Set coapObserveSessionInfos = sessionInfoToObserveRelationMap.keySet(); + Set observeSessions = coapObserveSessionInfos + .stream() + .map(CoapObserveSessionInfo::getSessionInfoProto) + .collect(Collectors.toSet()); observeSessions.forEach(this::reportActivity); }, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); } @@ -111,17 +115,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource { relation.setEstablished(); addObserveRelation(relation); } - AtomicInteger notificationCounter = tokenToObserveNotificationSeqMap.computeIfAbsent(token, s -> new AtomicInteger(0)); - response.getOptions().setObserve(notificationCounter.getAndIncrement()); + AtomicInteger observeNotificationCounter = tokenToCoapSessionInfoMap.get(token).getObserveNotificationCounter(); + response.getOptions().setObserve(observeNotificationCounter.getAndIncrement()); } // ObserveLayer takes care of the else case } - public void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) { + private void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) { relation.cancel(); relation.getExchange().sendResponse(new Response(code)); } - public Map getSessionInfoToObserveRelationMap() { + private Map getCoapSessionInfoToObserveRelationMap() { return sessionInfoToObserveRelationMap; } @@ -277,8 +281,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); break; case SUBSCRIBE_ATTRIBUTES_REQUEST: - TransportProtos.SessionInfoProto currentAttrSession = tokenToSessionInfoMap.get(getTokenFromRequest(request)); - if (currentAttrSession == null) { + CoapObserveSessionInfo currentCoapObserveAttrSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request)); + if (currentCoapObserveAttrSessionInfo == null) { attributeSubscriptions.add(sessionId); registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); @@ -290,20 +294,20 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } break; case UNSUBSCRIBE_ATTRIBUTES_REQUEST: - TransportProtos.SessionInfoProto attrSession = lookupAsyncSessionInfo(getTokenFromRequest(request)); - if (attrSession != null) { + CoapObserveSessionInfo coapObserveAttrSessionInfo = lookupAsyncSessionInfo(getTokenFromRequest(request)); + if (coapObserveAttrSessionInfo != null) { + TransportProtos.SessionInfoProto attrSession = coapObserveAttrSessionInfo.getSessionInfoProto(); UUID attrSessionId = toSessionId(attrSession); attributeSubscriptions.remove(attrSessionId); - sessionInfoToObserveRelationMap.remove(attrSession); transportService.process(attrSession, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), - new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); - closeAndDeregister(sessionInfo); + new CoapNoOpCallback(exchange)); } + closeAndDeregister(sessionInfo); break; case SUBSCRIBE_RPC_COMMANDS_REQUEST: - TransportProtos.SessionInfoProto currentRpcSession = tokenToSessionInfoMap.get(getTokenFromRequest(request)); - if (currentRpcSession == null) { + CoapObserveSessionInfo currentCoapObserveRpcSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request)); + if (currentCoapObserveRpcSessionInfo == null) { rpcSubscriptions.add(sessionId); registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); @@ -314,16 +318,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } break; case UNSUBSCRIBE_RPC_COMMANDS_REQUEST: - TransportProtos.SessionInfoProto rpcSession = lookupAsyncSessionInfo(getTokenFromRequest(request)); - if (rpcSession != null) { + CoapObserveSessionInfo coapObserveRpcSessionInfo = lookupAsyncSessionInfo(getTokenFromRequest(request)); + if (coapObserveRpcSessionInfo != null) { + TransportProtos.SessionInfoProto rpcSession = coapObserveRpcSessionInfo.getSessionInfoProto(); UUID rpcSessionId = toSessionId(rpcSession); rpcSubscriptions.remove(rpcSessionId); - sessionInfoToObserveRelationMap.remove(rpcSession); transportService.process(rpcSession, TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); - closeAndDeregister(sessionInfo); } + closeAndDeregister(sessionInfo); break; case TO_DEVICE_RPC_RESPONSE: transportService.process(sessionInfo, @@ -355,13 +359,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource { return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB()); } - private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(String token) { - tokenToObserveNotificationSeqMap.remove(token); - return tokenToSessionInfoMap.remove(token); + private CoapObserveSessionInfo lookupAsyncSessionInfo(String token) { + return tokenToCoapSessionInfoMap.remove(token); } private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) { - tokenToSessionInfoMap.putIfAbsent(token, sessionInfo); + tokenToCoapSessionInfoMap.putIfAbsent(token, new CoapObserveSessionInfo(sessionInfo)); transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder)); transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); } @@ -474,40 +477,33 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } @Override - public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg msg) { + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg msg) { + log.trace("[{}] Received attributes update notification to device", sessionId); try { exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg)); } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR); + closeAndDeregister(); } } @Override public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); - Map sessionToObserveRelationMap = coapTransportResource.getSessionInfoToObserveRelationMap(); - if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) { - Set observeSessions = sessionToObserveRelationMap.keySet(); - Optional observeSessionToClose = observeSessions.stream().filter(sessionInfoProto -> { - UUID observeSessionId = new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB()); - return observeSessionId.equals(sessionId); - }).findFirst(); - if (observeSessionToClose.isPresent()) { - TransportProtos.SessionInfoProto sessionInfoProto = observeSessionToClose.get(); - ObserveRelation observeRelation = sessionToObserveRelationMap.get(sessionInfoProto); - coapTransportResource.clearAndNotifyObserveRelation(observeRelation, CoAP.ResponseCode.SERVICE_UNAVAILABLE); - } - } + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.SERVICE_UNAVAILABLE); + closeAndDeregister(); } @Override - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg msg) { + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) { + log.trace("[{}] Received RPC command to device", sessionId); try { exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder)); } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR); + closeAndDeregister(); } } @@ -524,6 +520,30 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private boolean isConRequest() { return exchange.advanced().getRequest().isConfirmable(); } + + private void closeObserveRelationAndNotify(UUID sessionId, CoAP.ResponseCode responseCode) { + Map sessionToObserveRelationMap = coapTransportResource.getCoapSessionInfoToObserveRelationMap(); + if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) { + Optional observeSessionToClose = sessionToObserveRelationMap.keySet().stream().filter(coapObserveSessionInfo -> { + TransportProtos.SessionInfoProto sessionToDelete = coapObserveSessionInfo.getSessionInfoProto(); + UUID observeSessionId = new UUID(sessionToDelete.getSessionIdMSB(), sessionToDelete.getSessionIdLSB()); + return observeSessionId.equals(sessionId); + }).findFirst(); + if (observeSessionToClose.isPresent()) { + CoapObserveSessionInfo coapObserveSessionInfo = observeSessionToClose.get(); + ObserveRelation observeRelation = sessionToObserveRelationMap.get(coapObserveSessionInfo); + coapTransportResource.clearAndNotifyObserveRelation(observeRelation, responseCode); + } + } + } + + private void closeAndDeregister() { + Request request = exchange.advanced().getRequest(); + String token = coapTransportResource.getTokenFromRequest(request); + CoapObserveSessionInfo deleted = coapTransportResource.lookupAsyncSessionInfo(token); + coapTransportResource.closeAndDeregister(deleted.getSessionInfoProto()); + } + } public class CoapResourceObserver implements ResourceObserver { @@ -548,7 +568,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { public void addedObserveRelation(ObserveRelation relation) { Request request = relation.getExchange().getRequest(); String token = getTokenFromRequest(request); - sessionInfoToObserveRelationMap.putIfAbsent(tokenToSessionInfoMap.get(token), relation); + sessionInfoToObserveRelationMap.putIfAbsent(tokenToCoapSessionInfoMap.get(token), relation); log.trace("Added Observe relation for token: {}", token); } @@ -556,8 +576,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { public void removedObserveRelation(ObserveRelation relation) { Request request = relation.getExchange().getRequest(); String token = getTokenFromRequest(request); - TransportProtos.SessionInfoProto session = tokenToSessionInfoMap.get(token); - sessionInfoToObserveRelationMap.remove(session); + sessionInfoToObserveRelationMap.remove(tokenToCoapSessionInfoMap.get(token)); log.trace("Relation removed for token: {}", token); } } @@ -568,7 +587,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource { transportService.deregisterSession(session); rpcSubscriptions.remove(sessionId); attributeSubscriptions.remove(sessionId); - sessionInfoToObserveRelationMap.remove(session); } private TransportConfigurationContainer getTransportConfigurationContainer(DeviceProfile deviceProfile) throws AdaptorException { @@ -634,4 +652,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource { this.jsonPayload = jsonPayload; } } + + @Data + private static class CoapObserveSessionInfo { + + private final TransportProtos.SessionInfoProto sessionInfoProto; + private final AtomicInteger observeNotificationCounter; + + private CoapObserveSessionInfo(TransportProtos.SessionInfoProto sessionInfoProto) { + this.sessionInfoProto = sessionInfoProto; + this.observeNotificationCounter = new AtomicInteger(0); + } + } + } diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index f4a3f705af..79ff2b70d1 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -386,7 +386,8 @@ public class DeviceApiController implements TbTransportService { } @Override - public void onAttributeUpdate(AttributeUpdateNotificationMsg msg) { + public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg msg) { + log.trace("[{}] Received attributes update notification to device", sessionId); responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); } @@ -397,7 +398,8 @@ public class DeviceApiController implements TbTransportService { } @Override - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) { + public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg msg) { + log.trace("[{}] Received RPC command to device", sessionId); responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java index f39ced6dfc..b2059d9557 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java @@ -52,9 +52,10 @@ public class LwM2mSessionMsgListener implements GenericFutureListener