From 63342dfb7f7791a6b4940b71c8d122f1a835a20f Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 14 Jul 2021 08:58:50 +0300 Subject: [PATCH] saving persistent rpc if ack not expected --- .../transport/coap/CoapTransportResource.java | 36 +++++++++++-------- .../transport/mqtt/MqttTransportHandler.java | 22 +++--------- 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 449d371a2f..33834eed22 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -311,7 +311,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { , sessionInfo, getTokenFromRequest(request)); transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), - new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR) + new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR) ); } break; @@ -496,21 +496,27 @@ public class CoapTransportResource extends AbstractCoapTransportResource { Response response = coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder); int requestId = getNextMsgId(); response.setMID(requestId); - if (msg.getPersisted()) { - transportContext.getRpcAwaitingAck().put(requestId, msg); - transportContext.getScheduler().schedule(() -> { - TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId); - if (awaitingAckMsg != null) { - transportService.process(sessionInfo, msg, true, TransportServiceCallback.EMPTY); - } - }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); - } - response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { - TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); - if (rpcRequestMsg != null) { - transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY); + + if (isConRequest()) { + if (msg.getPersisted()) { + transportContext.getRpcAwaitingAck().put(requestId, msg); + transportContext.getScheduler().schedule(() -> { + TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId); + if (awaitingAckMsg != null) { + transportService.process(sessionInfo, msg, true, TransportServiceCallback.EMPTY); + } + }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); } - })); + response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { + TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); + if (rpcRequestMsg != null) { + transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY); + } + })); + } else if (msg.getPersisted()) { + transportService.process(sessionInfo, msg, false, TransportServiceCallback.EMPTY); + } + exchange.respond(response); } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 23f3a9cc5f..c42bb0163b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -39,7 +39,6 @@ import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.thingsboard.server.common.data.DataConstants; @@ -825,9 +824,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement log.trace("[{}] Received RPC command to device", sessionId); try { deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> { - RequestInfo requestInfo = publish(payload, deviceSessionCtx); - int msgId = requestInfo.getMsgId(); - + int msgId = ((MqttPublishMessage) payload).variableHeader().packetId(); if (isAckExpected(payload)) { if (rpcRequest.getPersisted()) { rpcAwaitingAck.put(msgId, rpcRequest); @@ -838,7 +835,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } }, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); } + } else if (rpcRequest.getPersisted()) { + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, false, TransportServiceCallback.EMPTY); } + publish(payload, deviceSessionCtx); }); } catch (Exception e) { log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e); @@ -855,13 +855,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private RequestInfo publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) { + private void publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) { deviceSessionCtx.getChannel().writeAndFlush(message); - - int msgId = ((MqttPublishMessage) message).variableHeader().packetId(); - RequestInfo requestInfo = new RequestInfo(msgId, System.currentTimeMillis(), deviceSessionCtx.getSessionInfo()); - - return requestInfo; } private boolean isAckExpected(MqttMessage message) { @@ -877,11 +872,4 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional deviceProfileOpt) { deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); } - - @Data - public static class RequestInfo { - private final int msgId; - private final long requestTime; - private final TransportProtos.SessionInfoProto sessionInfo; - } }