Improved persistent RPC for unack requests

This commit is contained in:
Andrii Shvaika 2021-07-14 14:23:54 +03:00
parent e914425b22
commit 0c60e18ea6
2 changed files with 35 additions and 34 deletions

View File

@ -492,13 +492,13 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
@Override
public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) {
log.trace("[{}] Received RPC command to device", sessionId);
boolean sent = false;
try {
Response response = coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder);
int requestId = getNextMsgId();
response.setMID(requestId);
if (msg.getPersisted()) {
if (isConRequest()) {
if (msg.getPersisted() && isConRequest()) {
transportContext.getRpcAwaitingAck().put(requestId, msg);
transportContext.getScheduler().schedule(() -> {
TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId);
@ -512,16 +512,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY);
}
}));
} else {
transportService.process(sessionInfo, msg, false, TransportServiceCallback.EMPTY);
}
}
exchange.respond(response);
sent = true;
} catch (AdaptorException e) {
log.trace("Failed to reply due to error", e);
closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
closeAndDeregister();
} finally {
if (msg.getPersisted() && !isConRequest()) {
transportService.process(sessionInfo, msg, sent, TransportServiceCallback.EMPTY);
}
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.gson.JsonParseException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
@ -825,8 +826,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
if (rpcRequest.getPersisted()) {
if (isAckExpected(payload)) {
if (rpcRequest.getPersisted() && isAckExpected(payload)) {
rpcAwaitingAck.put(msgId, rpcRequest);
context.getScheduler().schedule(() -> {
TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = rpcAwaitingAck.remove(msgId);
@ -834,11 +834,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, true, TransportServiceCallback.EMPTY);
}
}, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
} else {
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, false, TransportServiceCallback.EMPTY);
}
var cf = publish(payload, deviceSessionCtx);
if (rpcRequest.getPersisted() && !isAckExpected(payload)) {
cf.addListener(result -> transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, result.cause() != null, TransportServiceCallback.EMPTY));
}
publish(payload, deviceSessionCtx);
});
} catch (Exception e) {
log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
@ -855,8 +855,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) {
deviceSessionCtx.getChannel().writeAndFlush(message);
private ChannelFuture publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) {
return deviceSessionCtx.getChannel().writeAndFlush(message);
}
private boolean isAckExpected(MqttMessage message) {