diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 85b0f5e03d..455c0b9a01 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -65,6 +65,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; @Slf4j public class CoapTransportResource extends AbstractCoapTransportResource { @@ -76,9 +77,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private static final int REQUEST_ID_POSITION_CERTIFICATE_REQUEST = 4; private static final String DTLS_SESSION_ID_KEY = "DTLS_SESSION_ID"; - private final ConcurrentMap tokenToSessionInfoMap = new ConcurrentHashMap<>(); - private final ConcurrentMap tokenToObserveNotificationSeqMap = new ConcurrentHashMap<>(); - private final ConcurrentMap sessionInfoToObserveRelationMap = new ConcurrentHashMap<>(); + private final ConcurrentMap tokenToCoapSessionInfoMap = new ConcurrentHashMap<>(); + private final ConcurrentMap sessionInfoToObserveRelationMap = new ConcurrentHashMap<>(); private final Set rpcSubscriptions = ConcurrentHashMap.newKeySet(); private final Set attributeSubscriptions = ConcurrentHashMap.newKeySet(); @@ -94,7 +94,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource { this.timeout = coapServerService.getTimeout(); this.sessionReportTimeout = ctx.getSessionReportTimeout(); ctx.getScheduler().scheduleAtFixedRate(() -> { - Set observeSessions = sessionInfoToObserveRelationMap.keySet(); + Set coapObserveSessionInfos = sessionInfoToObserveRelationMap.keySet(); + Set observeSessions = coapObserveSessionInfos + .stream() + .map(CoapObserveSessionInfo::getSessionInfoProto) + .collect(Collectors.toSet()); observeSessions.forEach(this::reportActivity); }, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); } @@ -112,17 +116,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource { relation.setEstablished(); addObserveRelation(relation); } - AtomicInteger notificationCounter = tokenToObserveNotificationSeqMap.computeIfAbsent(token, s -> new AtomicInteger(0)); - response.getOptions().setObserve(notificationCounter.getAndIncrement()); + AtomicInteger observeNotificationCounter = tokenToCoapSessionInfoMap.get(token).getObserveNotificationCounter(); + response.getOptions().setObserve(observeNotificationCounter.getAndIncrement()); } // ObserveLayer takes care of the else case } - public void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) { + private void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) { relation.cancel(); relation.getExchange().sendResponse(new Response(code)); } - public Map getSessionInfoToObserveRelationMap() { + private Map getCoapSessionInfoToObserveRelationMap() { return sessionInfoToObserveRelationMap; } @@ -278,8 +282,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); break; case SUBSCRIBE_ATTRIBUTES_REQUEST: - TransportProtos.SessionInfoProto currentAttrSession = tokenToSessionInfoMap.get(getTokenFromRequest(request)); - if (currentAttrSession == null) { + CoapObserveSessionInfo currentCoapObserveAttrSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request)); + if (currentCoapObserveAttrSessionInfo == null) { attributeSubscriptions.add(sessionId); registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); @@ -291,20 +295,20 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } break; case UNSUBSCRIBE_ATTRIBUTES_REQUEST: - TransportProtos.SessionInfoProto attrSession = lookupAsyncSessionInfo(getTokenFromRequest(request)); - if (attrSession != null) { + CoapObserveSessionInfo coapObserveAttrSessionInfo = lookupAsyncSessionInfo(getTokenFromRequest(request)); + if (coapObserveAttrSessionInfo != null) { + TransportProtos.SessionInfoProto attrSession = coapObserveAttrSessionInfo.getSessionInfoProto(); UUID attrSessionId = toSessionId(attrSession); attributeSubscriptions.remove(attrSessionId); - sessionInfoToObserveRelationMap.remove(attrSession); transportService.process(attrSession, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), - new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); - closeAndDeregister(sessionInfo); + new CoapNoOpCallback(exchange)); } + closeAndDeregister(sessionInfo); break; case SUBSCRIBE_RPC_COMMANDS_REQUEST: - TransportProtos.SessionInfoProto currentRpcSession = tokenToSessionInfoMap.get(getTokenFromRequest(request)); - if (currentRpcSession == null) { + CoapObserveSessionInfo currentCoapObserveRpcSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request)); + if (currentCoapObserveRpcSessionInfo == null) { rpcSubscriptions.add(sessionId); registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); @@ -315,16 +319,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } break; case UNSUBSCRIBE_RPC_COMMANDS_REQUEST: - TransportProtos.SessionInfoProto rpcSession = lookupAsyncSessionInfo(getTokenFromRequest(request)); - if (rpcSession != null) { + CoapObserveSessionInfo coapObserveRpcSessionInfo = lookupAsyncSessionInfo(getTokenFromRequest(request)); + if (coapObserveRpcSessionInfo != null) { + TransportProtos.SessionInfoProto rpcSession = coapObserveRpcSessionInfo.getSessionInfoProto(); UUID rpcSessionId = toSessionId(rpcSession); rpcSubscriptions.remove(rpcSessionId); - sessionInfoToObserveRelationMap.remove(rpcSession); transportService.process(rpcSession, TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); - closeAndDeregister(sessionInfo); } + closeAndDeregister(sessionInfo); break; case TO_DEVICE_RPC_RESPONSE: transportService.process(sessionInfo, @@ -356,13 +360,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource { return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB()); } - private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(String token) { - tokenToObserveNotificationSeqMap.remove(token); - return tokenToSessionInfoMap.remove(token); + private CoapObserveSessionInfo lookupAsyncSessionInfo(String token) { + return tokenToCoapSessionInfoMap.remove(token); } private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) { - tokenToSessionInfoMap.putIfAbsent(token, sessionInfo); + tokenToCoapSessionInfoMap.putIfAbsent(token, new CoapObserveSessionInfo(sessionInfo)); transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo)); transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); } @@ -477,45 +480,36 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } @Override - public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg msg) { + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg msg) { + log.trace("[{}] Received attributes update notification to device", sessionId); try { exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg)); } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR); + closeAndDeregister(); } } @Override public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); - Map sessionToObserveRelationMap = coapTransportResource.getSessionInfoToObserveRelationMap(); - if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) { - Set observeSessions = sessionToObserveRelationMap.keySet(); - Optional observeSessionToClose = observeSessions.stream().filter(sessionInfoProto -> { - UUID observeSessionId = new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB()); - return observeSessionId.equals(sessionId); - }).findFirst(); - if (observeSessionToClose.isPresent()) { - TransportProtos.SessionInfoProto sessionInfoProto = observeSessionToClose.get(); - ObserveRelation observeRelation = sessionToObserveRelationMap.get(sessionInfoProto); - coapTransportResource.clearAndNotifyObserveRelation(observeRelation, CoAP.ResponseCode.SERVICE_UNAVAILABLE); - } - } + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.SERVICE_UNAVAILABLE); + closeAndDeregister(); } @Override - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg msg) { - boolean successful; + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) { + log.trace("[{}] Received RPC command to device", sessionId); + boolean successful = true; try { exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder)); - successful = true; } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR); successful = false; - } - if (msg.getPersisted()) { + } finally { + if (msg.getPersisted()) { RpcStatus status; if (!successful) { status = RpcStatus.FAILED; @@ -531,6 +525,10 @@ public class CoapTransportResource extends AbstractCoapTransportResource { .setStatus(status.name()) .build(); coapTransportResource.transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY); + } + if (!successful) { + closeAndDeregister(); + } } } @@ -547,6 +545,30 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private boolean isConRequest() { return exchange.advanced().getRequest().isConfirmable(); } + + private void closeObserveRelationAndNotify(UUID sessionId, CoAP.ResponseCode responseCode) { + Map sessionToObserveRelationMap = coapTransportResource.getCoapSessionInfoToObserveRelationMap(); + if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) { + Optional observeSessionToClose = sessionToObserveRelationMap.keySet().stream().filter(coapObserveSessionInfo -> { + TransportProtos.SessionInfoProto sessionToDelete = coapObserveSessionInfo.getSessionInfoProto(); + UUID observeSessionId = new UUID(sessionToDelete.getSessionIdMSB(), sessionToDelete.getSessionIdLSB()); + return observeSessionId.equals(sessionId); + }).findFirst(); + if (observeSessionToClose.isPresent()) { + CoapObserveSessionInfo coapObserveSessionInfo = observeSessionToClose.get(); + ObserveRelation observeRelation = sessionToObserveRelationMap.get(coapObserveSessionInfo); + coapTransportResource.clearAndNotifyObserveRelation(observeRelation, responseCode); + } + } + } + + private void closeAndDeregister() { + Request request = exchange.advanced().getRequest(); + String token = coapTransportResource.getTokenFromRequest(request); + CoapObserveSessionInfo deleted = coapTransportResource.lookupAsyncSessionInfo(token); + coapTransportResource.closeAndDeregister(deleted.getSessionInfoProto()); + } + } public class CoapResourceObserver implements ResourceObserver { @@ -571,7 +593,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { public void addedObserveRelation(ObserveRelation relation) { Request request = relation.getExchange().getRequest(); String token = getTokenFromRequest(request); - sessionInfoToObserveRelationMap.putIfAbsent(tokenToSessionInfoMap.get(token), relation); + sessionInfoToObserveRelationMap.putIfAbsent(tokenToCoapSessionInfoMap.get(token), relation); log.trace("Added Observe relation for token: {}", token); } @@ -579,8 +601,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { public void removedObserveRelation(ObserveRelation relation) { Request request = relation.getExchange().getRequest(); String token = getTokenFromRequest(request); - TransportProtos.SessionInfoProto session = tokenToSessionInfoMap.get(token); - sessionInfoToObserveRelationMap.remove(session); + sessionInfoToObserveRelationMap.remove(tokenToCoapSessionInfoMap.get(token)); log.trace("Relation removed for token: {}", token); } } @@ -591,7 +612,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource { transportService.deregisterSession(session); rpcSubscriptions.remove(sessionId); attributeSubscriptions.remove(sessionId); - sessionInfoToObserveRelationMap.remove(session); } private TransportConfigurationContainer getTransportConfigurationContainer(DeviceProfile deviceProfile) throws AdaptorException { @@ -657,4 +677,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource { this.jsonPayload = jsonPayload; } } + + @Data + private static class CoapObserveSessionInfo { + + private final TransportProtos.SessionInfoProto sessionInfoProto; + private final AtomicInteger observeNotificationCounter; + + private CoapObserveSessionInfo(TransportProtos.SessionInfoProto sessionInfoProto) { + this.sessionInfoProto = sessionInfoProto; + this.observeNotificationCounter = new AtomicInteger(0); + } + } + } diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index cfb25e9d78..9122126557 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -393,7 +393,8 @@ public class DeviceApiController implements TbTransportService { } @Override - public void onAttributeUpdate(AttributeUpdateNotificationMsg msg) { + public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg msg) { + log.trace("[{}] Received attributes update notification to device", sessionId); responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); } @@ -404,7 +405,8 @@ public class DeviceApiController implements TbTransportService { } @Override - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) { + public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg msg) { + log.trace("[{}] Received RPC command to device", sessionId); responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); if (msg.getPersisted()) { RpcStatus status; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java index bcc23dd778..7ebc6b6808 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java @@ -56,9 +56,10 @@ public class LwM2mSessionMsgListener implements GenericFutureListener { diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java index da459214c0..b8095a6eaf 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java @@ -129,7 +129,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S } @Override - public void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification) { + public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification) { + log.trace("[{}] Received attributes update notification to device", sessionId); snmpTransportContext.getSnmpTransportService().onAttributeUpdate(this, attributeUpdateNotification); } @@ -139,7 +140,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S } @Override - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) { + public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) { + log.trace("[{}] Received RPC command to device", sessionId); snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest); if (toDeviceRequest.getPersisted()) { RpcStatus status; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java index 2eaf8dbf7c..644da7f4ec 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java @@ -36,11 +36,11 @@ public interface SessionMsgListener { void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse); - void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification); + void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification); void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification); - void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest); + void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest); void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index e692d3b002..9c5463e625 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -20,8 +20,6 @@ import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.common.transport.service.SessionMetaData; -import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceCredentialsRequestMsg; @@ -30,9 +28,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOtaPackageRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOtaPackageResponseMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetResourceRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetResourceResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetSnmpDevicesRequestMsg; @@ -48,10 +46,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToDevicePersistedRpcResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCredRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MCredentialsRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; @@ -110,7 +109,7 @@ public interface TransportService { void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback); - void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback callback); + void process(SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback callback); void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback callback); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index b4e536a49b..3fc0950469 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -752,7 +752,7 @@ public class DefaultTransportService implements TransportService { listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse()); } if (toSessionMsg.hasAttributeUpdateNotification()) { - listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification()); + listener.onAttributeUpdate(sessionId, toSessionMsg.getAttributeUpdateNotification()); } if (toSessionMsg.hasSessionCloseNotification()) { listener.onRemoteSessionCloseCommand(sessionId, toSessionMsg.getSessionCloseNotification()); @@ -761,7 +761,7 @@ public class DefaultTransportService implements TransportService { listener.onToTransportUpdateCredentials(toSessionMsg.getToTransportUpdateCredentialsNotification()); } if (toSessionMsg.hasToDeviceRequest()) { - listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest()); + listener.onToDeviceRpcRequest(sessionId, toSessionMsg.getToDeviceRequest()); } if (toSessionMsg.hasToServerResponse()) { String requestId = sessionId + "-" + toSessionMsg.getToServerResponse().getRequestId();