refactoring after review
This commit is contained in:
		
							parent
							
								
									d14ed53b64
								
							
						
					
					
						commit
						69902f3cd6
					
				@ -192,7 +192,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
        ToDeviceRpcRequest request = msg.getMsg();
 | 
			
		||||
        UUID rpcId = request.getId();
 | 
			
		||||
        log.debug("[{}][{}] Received RPC request to process ...", deviceId, rpcId);
 | 
			
		||||
        ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request);
 | 
			
		||||
        ToDeviceRpcRequestMsg rpcRequest = createToDeviceRpcRequestMsg(request);
 | 
			
		||||
 | 
			
		||||
        long timeout = request.getExpirationTime() - System.currentTimeMillis();
 | 
			
		||||
        boolean persisted = request.isPersisted();
 | 
			
		||||
@ -243,20 +243,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
        } else {
 | 
			
		||||
            registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
 | 
			
		||||
        }
 | 
			
		||||
        String rpcSent = sent ? "" : "NOT ";
 | 
			
		||||
        log.debug("[{}][{}][{}] RPC request is {}sent!", deviceId, rpcId, requestId, rpcSent);
 | 
			
		||||
        String rpcSent = sent ? "sent!" : "NOT sent!";
 | 
			
		||||
        log.debug("[{}][{}][{}] RPC request is {}", deviceId, rpcId, requestId, rpcSent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean isSendNewRpcAvailable() {
 | 
			
		||||
        if (rpcSequential) {
 | 
			
		||||
            if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_ACK_FROM_DEVICE)) {
 | 
			
		||||
        switch (rpcSubmitStrategy) {
 | 
			
		||||
            case SEQUENTIAL_ON_ACK_FROM_DEVICE:
 | 
			
		||||
                return toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty();
 | 
			
		||||
            }
 | 
			
		||||
            if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) {
 | 
			
		||||
            case SEQUENTIAL_ON_RESPONSE_FROM_DEVICE:
 | 
			
		||||
                return toDeviceRpcPendingMap.values().stream().filter(ToDeviceRpcRequestMetadata::isDelivered).findAny().isEmpty();
 | 
			
		||||
            }
 | 
			
		||||
            default:
 | 
			
		||||
                return true;
 | 
			
		||||
        }
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void createRpc(ToDeviceRpcRequest request, RpcStatus status) {
 | 
			
		||||
@ -271,7 +270,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
        systemContext.getTbRpcService().save(tenantId, rpc);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) {
 | 
			
		||||
    private ToDeviceRpcRequestMsg createToDeviceRpcRequestMsg(ToDeviceRpcRequest request) {
 | 
			
		||||
        ToDeviceRpcRequestBody body = request.getBody();
 | 
			
		||||
        return ToDeviceRpcRequestMsg.newBuilder()
 | 
			
		||||
                .setRequestId(rpcSeq++)
 | 
			
		||||
@ -647,12 +646,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
            } finally {
 | 
			
		||||
                if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) {
 | 
			
		||||
                    clearAwaitRpcResponseScheduler();
 | 
			
		||||
                    String errorResponse = hasError ? "error " : "";
 | 
			
		||||
                    String errorResponse = hasError ? "error response" : "response";
 | 
			
		||||
                    String rpcState = delivered ? "" : "undelivered ";
 | 
			
		||||
                    sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for %sRPC!", errorResponse, rpcState));
 | 
			
		||||
                    sendNextPendingRequest(rpcId, requestId, String.format("Received %s for %sRPC!", errorResponse, rpcState));
 | 
			
		||||
                } else if (!delivered) {
 | 
			
		||||
                    String errorResponse = hasError ? "error " : "";
 | 
			
		||||
                    sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for undelivered RPC!", errorResponse));
 | 
			
		||||
                    String errorResponse = hasError ? "error response" : "response";
 | 
			
		||||
                    sendNextPendingRequest(rpcId, requestId, String.format("Received %s for undelivered RPC!", errorResponse));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
@ -1049,7 +1048,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
                    rpc.setStatus(RpcStatus.EXPIRED);
 | 
			
		||||
                    systemContext.getTbRpcService().save(tenantId, rpc);
 | 
			
		||||
                } else {
 | 
			
		||||
                    registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout);
 | 
			
		||||
                    registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, createToDeviceRpcRequestMsg(msg), timeout);
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
            if (pageData.hasNext()) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user