added new RPC statuses

This commit is contained in:
YevhenBondarenko 2021-08-17 13:25:24 +03:00
parent 6436c8a26c
commit 8869dc0cb0
14 changed files with 114 additions and 41 deletions

View File

@ -400,6 +400,14 @@ public class ActorSystemContext {
@Getter
private String debugPerTenantLimitsConfiguration;
@Value("${actors.rpc.sequence.enabled:true}")
@Getter
private boolean rpcSequenceEnabled;
@Value("${actors.rpc.persistent.retries:5}")
@Getter
private int maxPersistentRpcRetries;
@Getter
@Setter
private TbActorSystem actorSystem;

View File

@ -121,6 +121,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
private final Map<UUID, SessionInfo> attributeSubscriptions;
private final Map<UUID, SessionInfo> rpcSubscriptions;
private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
private final boolean rpcSequenceEnabled;
private int rpcSeq = 0;
private String deviceName;
@ -132,6 +133,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
super(systemContext);
this.tenantId = tenantId;
this.deviceId = deviceId;
this.rpcSequenceEnabled = systemContext.isRpcSequenceEnabled();
this.attributeSubscriptions = new HashMap<>();
this.rpcSubscriptions = new HashMap<>();
this.toDeviceRpcPendingMap = new LinkedHashMap<>();
@ -185,19 +187,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
if (timeout <= 0) {
log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime());
if (persisted) {
createRpc(request, RpcStatus.TIMEOUT);
createRpc(request, RpcStatus.EXPIRED);
}
return;
} else if (persisted) {
createRpc(request, RpcStatus.QUEUED);
}
boolean sent;
boolean sent = false;
if (systemContext.isEdgesEnabled() && edgeId != null) {
log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId());
saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId());
sent = true;
} else {
} else if (!rpcSequenceEnabled || toDeviceRpcPendingMap.isEmpty()) {
sent = rpcSubscriptions.size() > 0;
Set<UUID> syncSessionSet = new HashSet<>();
rpcSubscriptions.forEach((key, value) -> {
@ -292,7 +294,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
if (requestMd != null) {
log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
if (requestMd.getMsg().getMsg().isPersisted()) {
systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null);
systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.EXPIRED, null);
}
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
@ -300,7 +302,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
}
}
private void sendPendingRequest(TbActorCtx context, UUID sessionId, String nodeId) {
private void sendPendingRequests(TbActorCtx context, UUID sessionId, String nodeId) {
SessionType sessionType = getSessionType(sessionId);
if (!toDeviceRpcPendingMap.isEmpty()) {
log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
@ -312,11 +314,34 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
}
Set<Integer> sentOneWayIds = new HashSet<>();
if (sessionType == SessionType.ASYNC) {
if (rpcSequenceEnabled) {
List<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> entries = new ArrayList<>();
for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry : toDeviceRpcPendingMap.entrySet()) {
if (entry.getValue().isDelivered()) {
continue;
}
entries.add(entry);
if (entry.getValue().getMsg().getMsg().isPersisted() || entry.getValue().getMsg().getMsg().isOneway()) {
break;
}
}
entries.forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
} else {
toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
}
} else {
toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
}
sentOneWayIds.stream().filter(id -> !toDeviceRpcPendingMap.get(id).getMsg().getMsg().isPersisted()).forEach(toDeviceRpcPendingMap::remove);
}
private void sendNextPendingRequest(TbActorCtx context) {
rpcSubscriptions.forEach((id, s) -> sendPendingRequest(context, id, s.getNodeId()));
if (rpcSequenceEnabled) {
rpcSubscriptions.forEach((id, s) -> sendPendingRequests(context, id, s.getNodeId()));
}
}
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(TbActorCtx context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
@ -338,11 +363,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
.setPersisted(request.isPersisted())
.build();
sendToTransport(rpcRequest, sessionId, nodeId);
if (SessionType.ASYNC.equals(getSessionType(sessionId)) && request.isOneway() && !request.isPersisted()) {
toDeviceRpcPendingMap.remove(entry.getKey());
sendPendingRequest(context, sessionId, nodeId);
}
};
}
@ -361,7 +381,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC());
}
if (msg.hasSendPendingRPC()) {
sendPendingRequest(context, getSessionId(sessionInfo), sessionInfo.getNodeId());
sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo.getNodeId());
}
if (msg.hasGetAttributes()) {
handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes());
@ -559,16 +579,28 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg responseMsg) {
UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB());
RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus());
ToDeviceRpcRequestMetadata md;
if (RpcStatus.DELIVERED.equals(status)) {
md = toDeviceRpcPendingMap.get(responseMsg.getRequestId());
} else {
md = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
}
ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(responseMsg.getRequestId());
if (md != null) {
if (status.equals(RpcStatus.DELIVERED)) {
if (md.getMsg().getMsg().isOneway()) {
toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
} else {
md.setDelivered(true);
}
} else if (status.equals(RpcStatus.TIMEOUT)) {
if (systemContext.getMaxPersistentRpcRetries() <= md.getRetries()) {
toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
status = RpcStatus.FAILED;
} else {
md.setRetries(md.getRetries() + 1);
}
}
systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, null);
if (status != RpcStatus.SENT) {
sendNextPendingRequest(context);
}
} else {
log.info("[{}][{}] Rpc has already removed from pending map.", deviceId, rpcId);
}
@ -608,7 +640,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
sessionMD.setSubscribedToRPC(true);
log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
sendPendingRequest(context, sessionId, sessionInfo.getNodeId());
sendPendingRequests(context, sessionId, sessionInfo.getNodeId());
dumpSessions();
}
}
@ -884,7 +916,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class);
long timeout = rpc.getExpirationTime() - System.currentTimeMillis();
if (timeout <= 0) {
rpc.setStatus(RpcStatus.TIMEOUT);
rpc.setStatus(RpcStatus.EXPIRED);
systemContext.getTbRpcService().save(tenantId, rpc);
} else {
registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout);

View File

@ -25,4 +25,6 @@ import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
public class ToDeviceRpcRequestMetadata {
private final ToDeviceRpcRequestActorMsg msg;
private final boolean sent;
private int retries;
private boolean delivered;
}

View File

@ -326,6 +326,11 @@ actors:
queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:15000}"
# Time in milliseconds for transaction to complete
duration: "${ACTORS_RULE_TRANSACTION_DURATION:60000}"
rpc:
persistent:
retries: "${ACTORS_RPC_PERSISTENT_RETRIES:5}"
sequence:
enabled: "${ACTORS_RPC_SEQUENCE_ENABLED:true}"
statistics:
# Enable/disable actor statistics
enabled: "${ACTORS_STATISTICS_ENABLED:true}"

View File

@ -16,5 +16,5 @@
package org.thingsboard.server.common.data.rpc;
public enum RpcStatus {
QUEUED, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED
QUEUED, SENT, DELIVERED, SUCCESSFUL, TIMEOUT, EXPIRED, FAILED
}

View File

@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadCo
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.msg.session.FeatureType;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.SessionMsgListener;
@ -532,7 +533,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
if (rpcRequestMsg != null) {
transportService.process(state.getSession(), rpcRequestMsg, TransportServiceCallback.EMPTY);
transportService.process(state.getSession(), rpcRequestMsg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
}
}, null));
}
@ -553,8 +554,12 @@ public class DefaultCoapClientContext implements CoapClientContext {
transportService.process(state.getSession(),
TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
.setRequestId(msg.getRequestId()).setError(error).build(), TransportServiceCallback.EMPTY);
} else if (msg.getPersisted() && !conRequest && sent) {
transportService.process(state.getSession(), msg, TransportServiceCallback.EMPTY);
} else if (msg.getPersisted() && sent) {
if (conRequest) {
transportService.process(state.getSession(), msg, RpcStatus.SENT, TransportServiceCallback.EMPTY);
} else {
transportService.process(state.getSession(), msg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
}
}
}
}

View File

@ -409,7 +409,7 @@ public class DeviceApiController implements TbTransportService {
public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg msg) {
log.trace("[{}] Received RPC command to device", sessionId);
responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
transportService.process(sessionInfo, msg, TransportServiceCallback.EMPTY);
transportService.process(sessionInfo, msg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
}
@Override

View File

@ -21,7 +21,9 @@ import org.eclipse.leshan.core.ResponseCode;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
@ -158,6 +160,7 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler {
throw new IllegalArgumentException("Unsupported operation: " + operationType.name());
}
}
transportService.process(client.getSession(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
} catch (IllegalArgumentException e) {
this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST, e.getMessage());
}

View File

@ -19,6 +19,7 @@ import org.eclipse.leshan.core.ResponseCode;
import org.eclipse.leshan.core.request.exception.ClientSleepingException;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -44,7 +45,7 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
@Override
public void onSuccess(R request, T response) {
transportService.process(client.getSession(), this.request, TransportServiceCallback.EMPTY);
transportService.process(client.getSession(), this.request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
sendRpcReplyOnSuccess(response);
if (callback != null) {
callback.onSuccess(request, response);
@ -61,7 +62,9 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
@Override
public void onError(String params, Exception e) {
if (!(e instanceof TimeoutException || e instanceof ClientSleepingException)) {
if (e instanceof TimeoutException) {
transportService.process(client.getSession(), this.request, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
} else if (!(e instanceof ClientSleepingException)) {
sendRpcReplyOnError(e);
}
if (callback != null) {

View File

@ -50,6 +50,7 @@ import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.id.OtaPackageId;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.msg.EncryptionUtil;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.common.transport.SessionMsgListener;
@ -272,7 +273,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();
TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId);
if (rpcRequest != null) {
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, TransportServiceCallback.EMPTY);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
}
break;
default:
@ -856,10 +857,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
}
var cf = publish(payload, deviceSessionCtx);
if (rpcRequest.getPersisted() && !isAckExpected(payload)) {
if (rpcRequest.getPersisted()) {
cf.addListener(result -> {
if (result.cause() == null) {
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, TransportServiceCallback.EMPTY);
if (isAckExpected(payload)) {
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
} else {
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
}
}
});
}

View File

@ -16,8 +16,10 @@
package org.thingsboard.server.transport.mqtt.session;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.mqtt.MqttMessage;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
@ -102,9 +104,13 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
payload -> {
ChannelFuture channelFuture = parent.writeAndFlush(payload);
if (request.getPersisted()) {
channelFuture.addListener(future -> {
if (future.cause() == null) {
transportService.process(getSessionInfo(), request, TransportServiceCallback.EMPTY);
channelFuture.addListener(result -> {
if (result.cause() == null) {
if (isAckExpected(payload)) {
transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
} else {
transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
}
}
});
}
@ -129,4 +135,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
// This feature is not supported in the TB IoT Gateway yet.
}
private boolean isAckExpected(MqttMessage message) {
return message.fixedHeader().qosLevel().value() > 0;
}
}

View File

@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
@ -142,7 +143,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) {
log.trace("[{}] Received RPC command to device", sessionId);
snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, TransportServiceCallback.EMPTY);
snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
}
@Override

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.common.transport;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.service.SessionMetaData;
@ -112,7 +113,7 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);

View File

@ -580,15 +580,13 @@ public class DefaultTransportService implements TransportService {
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback<Void> callback) {
if (msg.getPersisted()) {
RpcStatus status = msg.getOneway() ? RpcStatus.SUCCESSFUL : RpcStatus.DELIVERED;
TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
.setRequestId(msg.getRequestId())
.setRequestIdLSB(msg.getRequestIdLSB())
.setRequestIdMSB(msg.getRequestIdMSB())
.setStatus(status.name())
.setStatus(rpcStatus.name())
.build();
if (checkLimits(sessionInfo, responseMsg, callback)) {