Merge branch 'master' of github.com:thingsboard/thingsboard
This commit is contained in:
		
						commit
						1f3e3d579f
					
				@ -273,13 +273,13 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
 | 
			
		||||
        telemetryService.saveAndNotify(tenantId, deviceId, Collections.singletonList(status), new FutureCallback<>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(@Nullable Void tmp) {
 | 
			
		||||
                log.trace("[{}] Success save telemetry with target firmware for device!", deviceId);
 | 
			
		||||
                log.trace("[{}] Success save telemetry with target {} for device!", deviceId, otaPackage);
 | 
			
		||||
                updateAttributes(device, otaPackage, ts, tenantId, deviceId, otaPackageType);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(Throwable t) {
 | 
			
		||||
                log.error("[{}] Failed to save telemetry with target firmware for device!", deviceId, t);
 | 
			
		||||
                log.error("[{}] Failed to save telemetry with target {} for device!", deviceId, otaPackage, t);
 | 
			
		||||
                updateAttributes(device, otaPackage, ts, tenantId, deviceId, otaPackageType);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
@ -311,7 +311,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
 | 
			
		||||
                                , sessionInfo, getTokenFromRequest(request));
 | 
			
		||||
                        transportService.process(sessionInfo,
 | 
			
		||||
                                TransportProtos.SubscribeToRPCMsg.getDefaultInstance(),
 | 
			
		||||
                                 new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)
 | 
			
		||||
                                new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)
 | 
			
		||||
                        );
 | 
			
		||||
                    }
 | 
			
		||||
                    break;
 | 
			
		||||
@ -496,21 +496,27 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
 | 
			
		||||
                Response response = coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder);
 | 
			
		||||
                int requestId = getNextMsgId();
 | 
			
		||||
                response.setMID(requestId);
 | 
			
		||||
 | 
			
		||||
                if (msg.getPersisted()) {
 | 
			
		||||
                    transportContext.getRpcAwaitingAck().put(requestId, msg);
 | 
			
		||||
                    transportContext.getScheduler().schedule(() -> {
 | 
			
		||||
                        TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId);
 | 
			
		||||
                        if (awaitingAckMsg != null) {
 | 
			
		||||
                            transportService.process(sessionInfo, msg, true, TransportServiceCallback.EMPTY);
 | 
			
		||||
                        }
 | 
			
		||||
                    }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
 | 
			
		||||
                }
 | 
			
		||||
                response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
 | 
			
		||||
                    TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
 | 
			
		||||
                    if (rpcRequestMsg != null) {
 | 
			
		||||
                        transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY);
 | 
			
		||||
                    if (isConRequest()) {
 | 
			
		||||
                        transportContext.getRpcAwaitingAck().put(requestId, msg);
 | 
			
		||||
                        transportContext.getScheduler().schedule(() -> {
 | 
			
		||||
                            TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId);
 | 
			
		||||
                            if (awaitingAckMsg != null) {
 | 
			
		||||
                                transportService.process(sessionInfo, msg, true, TransportServiceCallback.EMPTY);
 | 
			
		||||
                            }
 | 
			
		||||
                        }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
 | 
			
		||||
                        response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
 | 
			
		||||
                            TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
 | 
			
		||||
                            if (rpcRequestMsg != null) {
 | 
			
		||||
                                transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY);
 | 
			
		||||
                            }
 | 
			
		||||
                        }));
 | 
			
		||||
                    } else {
 | 
			
		||||
                        transportService.process(sessionInfo, msg, false, TransportServiceCallback.EMPTY);
 | 
			
		||||
                    }
 | 
			
		||||
                }));
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                exchange.respond(response);
 | 
			
		||||
            } catch (AdaptorException e) {
 | 
			
		||||
                log.trace("Failed to reply due to error", e);
 | 
			
		||||
 | 
			
		||||
@ -39,7 +39,6 @@ import io.netty.util.CharsetUtil;
 | 
			
		||||
import io.netty.util.ReferenceCountUtil;
 | 
			
		||||
import io.netty.util.concurrent.Future;
 | 
			
		||||
import io.netty.util.concurrent.GenericFutureListener;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.lang3.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
@ -825,11 +824,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
        log.trace("[{}] Received RPC command to device", sessionId);
 | 
			
		||||
        try {
 | 
			
		||||
            deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
 | 
			
		||||
                RequestInfo requestInfo = publish(payload, deviceSessionCtx);
 | 
			
		||||
                int msgId = requestInfo.getMsgId();
 | 
			
		||||
 | 
			
		||||
                if (isAckExpected(payload)) {
 | 
			
		||||
                    if (rpcRequest.getPersisted()) {
 | 
			
		||||
                int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
 | 
			
		||||
                if (rpcRequest.getPersisted()) {
 | 
			
		||||
                    if (isAckExpected(payload)) {
 | 
			
		||||
                        rpcAwaitingAck.put(msgId, rpcRequest);
 | 
			
		||||
                        context.getScheduler().schedule(() -> {
 | 
			
		||||
                            TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = rpcAwaitingAck.remove(msgId);
 | 
			
		||||
@ -837,8 +834,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
                                transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, true, TransportServiceCallback.EMPTY);
 | 
			
		||||
                            }
 | 
			
		||||
                        }, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, false, TransportServiceCallback.EMPTY);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                publish(payload, deviceSessionCtx);
 | 
			
		||||
            });
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
 | 
			
		||||
@ -855,13 +855,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private RequestInfo publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) {
 | 
			
		||||
    private void publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) {
 | 
			
		||||
        deviceSessionCtx.getChannel().writeAndFlush(message);
 | 
			
		||||
 | 
			
		||||
        int msgId = ((MqttPublishMessage) message).variableHeader().packetId();
 | 
			
		||||
        RequestInfo requestInfo = new RequestInfo(msgId, System.currentTimeMillis(), deviceSessionCtx.getSessionInfo());
 | 
			
		||||
 | 
			
		||||
        return requestInfo;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean isAckExpected(MqttMessage message) {
 | 
			
		||||
@ -877,11 +872,4 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
    public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
 | 
			
		||||
        deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Data
 | 
			
		||||
    public static class RequestInfo {
 | 
			
		||||
        private final int msgId;
 | 
			
		||||
        private final long requestTime;
 | 
			
		||||
        private final TransportProtos.SessionInfoProto sessionInfo;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user