saving persistent rpc if ack not expected

This commit is contained in:
YevhenBondarenko 2021-07-14 08:58:50 +03:00
parent 4e229ea24e
commit 63342dfb7f
2 changed files with 26 additions and 32 deletions

View File

@ -311,7 +311,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
, sessionInfo, getTokenFromRequest(request)); , sessionInfo, getTokenFromRequest(request));
transportService.process(sessionInfo, transportService.process(sessionInfo,
TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), TransportProtos.SubscribeToRPCMsg.getDefaultInstance(),
new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR) new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)
); );
} }
break; break;
@ -496,21 +496,27 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
Response response = coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder); Response response = coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder);
int requestId = getNextMsgId(); int requestId = getNextMsgId();
response.setMID(requestId); response.setMID(requestId);
if (msg.getPersisted()) {
transportContext.getRpcAwaitingAck().put(requestId, msg); if (isConRequest()) {
transportContext.getScheduler().schedule(() -> { if (msg.getPersisted()) {
TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId); transportContext.getRpcAwaitingAck().put(requestId, msg);
if (awaitingAckMsg != null) { transportContext.getScheduler().schedule(() -> {
transportService.process(sessionInfo, msg, true, TransportServiceCallback.EMPTY); TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId);
} if (awaitingAckMsg != null) {
}, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); transportService.process(sessionInfo, msg, true, TransportServiceCallback.EMPTY);
} }
response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
if (rpcRequestMsg != null) {
transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY);
} }
})); response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
if (rpcRequestMsg != null) {
transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY);
}
}));
} else if (msg.getPersisted()) {
transportService.process(sessionInfo, msg, false, TransportServiceCallback.EMPTY);
}
exchange.respond(response); exchange.respond(response);
} catch (AdaptorException e) { } catch (AdaptorException e) {
log.trace("Failed to reply due to error", e); log.trace("Failed to reply due to error", e);

View File

@ -39,7 +39,6 @@ import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
@ -825,9 +824,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
log.trace("[{}] Received RPC command to device", sessionId); log.trace("[{}] Received RPC command to device", sessionId);
try { try {
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> { deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
RequestInfo requestInfo = publish(payload, deviceSessionCtx); int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
int msgId = requestInfo.getMsgId();
if (isAckExpected(payload)) { if (isAckExpected(payload)) {
if (rpcRequest.getPersisted()) { if (rpcRequest.getPersisted()) {
rpcAwaitingAck.put(msgId, rpcRequest); rpcAwaitingAck.put(msgId, rpcRequest);
@ -838,7 +835,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
}, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); }, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
} }
} else if (rpcRequest.getPersisted()) {
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, false, TransportServiceCallback.EMPTY);
} }
publish(payload, deviceSessionCtx);
}); });
} catch (Exception e) { } catch (Exception e) {
log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e); log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
@ -855,13 +855,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
} }
private RequestInfo publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) { private void publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) {
deviceSessionCtx.getChannel().writeAndFlush(message); deviceSessionCtx.getChannel().writeAndFlush(message);
int msgId = ((MqttPublishMessage) message).variableHeader().packetId();
RequestInfo requestInfo = new RequestInfo(msgId, System.currentTimeMillis(), deviceSessionCtx.getSessionInfo());
return requestInfo;
} }
private boolean isAckExpected(MqttMessage message) { private boolean isAckExpected(MqttMessage message) {
@ -877,11 +872,4 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) { public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
} }
@Data
public static class RequestInfo {
private final int msgId;
private final long requestTime;
private final TransportProtos.SessionInfoProto sessionInfo;
}
} }