From 0c60e18ea62649fd032198d2a62dca94d80ba0e0 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Wed, 14 Jul 2021 14:23:54 +0300 Subject: [PATCH] Improved persistent RPC for unack requests --- .../transport/coap/CoapTransportResource.java | 39 ++++++++++--------- .../transport/mqtt/MqttTransportHandler.java | 30 +++++++------- 2 files changed, 35 insertions(+), 34 deletions(-) diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 2facdcd305..d78f402e31 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -492,36 +492,37 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @Override public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) { log.trace("[{}] Received RPC command to device", sessionId); + boolean sent = false; try { Response response = coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder); int requestId = getNextMsgId(); response.setMID(requestId); - if (msg.getPersisted()) { - if (isConRequest()) { - transportContext.getRpcAwaitingAck().put(requestId, msg); - transportContext.getScheduler().schedule(() -> { - TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId); - if (awaitingAckMsg != null) { - transportService.process(sessionInfo, msg, true, TransportServiceCallback.EMPTY); - } - }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); - response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { - TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); - if (rpcRequestMsg != null) { - transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY); - } - })); - } else { - transportService.process(sessionInfo, msg, false, TransportServiceCallback.EMPTY); - } + if (msg.getPersisted() && isConRequest()) { + transportContext.getRpcAwaitingAck().put(requestId, msg); + transportContext.getScheduler().schedule(() -> { + TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId); + if (awaitingAckMsg != null) { + transportService.process(sessionInfo, msg, true, TransportServiceCallback.EMPTY); + } + }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); + response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { + TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); + if (rpcRequestMsg != null) { + transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY); + } + })); } - exchange.respond(response); + sent = true; } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR); closeAndDeregister(); + } finally { + if (msg.getPersisted() && !isConRequest()) { + transportService.process(sessionInfo, msg, sent, TransportServiceCallback.EMPTY); + } } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index e09a47972b..6afdadffff 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt; import com.fasterxml.jackson.databind.JsonNode; import com.google.gson.JsonParseException; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.mqtt.MqttConnAckMessage; @@ -825,20 +826,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement try { deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> { int msgId = ((MqttPublishMessage) payload).variableHeader().packetId(); - if (rpcRequest.getPersisted()) { - if (isAckExpected(payload)) { - rpcAwaitingAck.put(msgId, rpcRequest); - context.getScheduler().schedule(() -> { - TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = rpcAwaitingAck.remove(msgId); - if (awaitingAckMsg != null) { - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, true, TransportServiceCallback.EMPTY); - } - }, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); - } else { - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, false, TransportServiceCallback.EMPTY); - } + if (rpcRequest.getPersisted() && isAckExpected(payload)) { + rpcAwaitingAck.put(msgId, rpcRequest); + context.getScheduler().schedule(() -> { + TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = rpcAwaitingAck.remove(msgId); + if (awaitingAckMsg != null) { + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, true, TransportServiceCallback.EMPTY); + } + }, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); + } + var cf = publish(payload, deviceSessionCtx); + if (rpcRequest.getPersisted() && !isAckExpected(payload)) { + cf.addListener(result -> transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, result.cause() != null, TransportServiceCallback.EMPTY)); } - publish(payload, deviceSessionCtx); }); } catch (Exception e) { log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e); @@ -855,8 +855,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) { - deviceSessionCtx.getChannel().writeAndFlush(message); + private ChannelFuture publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) { + return deviceSessionCtx.getChannel().writeAndFlush(message); } private boolean isAckExpected(MqttMessage message) {