diff --git a/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java b/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java index 76797137b7..2d6de7cfdf 100644 --- a/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java @@ -273,13 +273,13 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService { telemetryService.saveAndNotify(tenantId, deviceId, Collections.singletonList(status), new FutureCallback<>() { @Override public void onSuccess(@Nullable Void tmp) { - log.trace("[{}] Success save telemetry with target firmware for device!", deviceId); + log.trace("[{}] Success save telemetry with target {} for device!", deviceId, otaPackage); updateAttributes(device, otaPackage, ts, tenantId, deviceId, otaPackageType); } @Override public void onFailure(Throwable t) { - log.error("[{}] Failed to save telemetry with target firmware for device!", deviceId, t); + log.error("[{}] Failed to save telemetry with target {} for device!", deviceId, otaPackage, t); updateAttributes(device, otaPackage, ts, tenantId, deviceId, otaPackageType); } }); diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 449d371a2f..2facdcd305 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -311,7 +311,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { , sessionInfo, getTokenFromRequest(request)); transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), - new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR) + new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR) ); } break; @@ -496,21 +496,27 @@ public class CoapTransportResource extends AbstractCoapTransportResource { Response response = coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder); int requestId = getNextMsgId(); response.setMID(requestId); + if (msg.getPersisted()) { - transportContext.getRpcAwaitingAck().put(requestId, msg); - transportContext.getScheduler().schedule(() -> { - TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId); - if (awaitingAckMsg != null) { - transportService.process(sessionInfo, msg, true, TransportServiceCallback.EMPTY); - } - }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); - } - response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { - TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); - if (rpcRequestMsg != null) { - transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY); + if (isConRequest()) { + transportContext.getRpcAwaitingAck().put(requestId, msg); + transportContext.getScheduler().schedule(() -> { + TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId); + if (awaitingAckMsg != null) { + transportService.process(sessionInfo, msg, true, TransportServiceCallback.EMPTY); + } + }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); + response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { + TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); + if (rpcRequestMsg != null) { + transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY); + } + })); + } else { + transportService.process(sessionInfo, msg, false, TransportServiceCallback.EMPTY); } - })); + } + exchange.respond(response); } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 23f3a9cc5f..e09a47972b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -39,7 +39,6 @@ import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.thingsboard.server.common.data.DataConstants; @@ -825,11 +824,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement log.trace("[{}] Received RPC command to device", sessionId); try { deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> { - RequestInfo requestInfo = publish(payload, deviceSessionCtx); - int msgId = requestInfo.getMsgId(); - - if (isAckExpected(payload)) { - if (rpcRequest.getPersisted()) { + int msgId = ((MqttPublishMessage) payload).variableHeader().packetId(); + if (rpcRequest.getPersisted()) { + if (isAckExpected(payload)) { rpcAwaitingAck.put(msgId, rpcRequest); context.getScheduler().schedule(() -> { TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = rpcAwaitingAck.remove(msgId); @@ -837,8 +834,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, true, TransportServiceCallback.EMPTY); } }, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); + } else { + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, false, TransportServiceCallback.EMPTY); } } + publish(payload, deviceSessionCtx); }); } catch (Exception e) { log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e); @@ -855,13 +855,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private RequestInfo publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) { + private void publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) { deviceSessionCtx.getChannel().writeAndFlush(message); - - int msgId = ((MqttPublishMessage) message).variableHeader().packetId(); - RequestInfo requestInfo = new RequestInfo(msgId, System.currentTimeMillis(), deviceSessionCtx.getSessionInfo()); - - return requestInfo; } private boolean isAckExpected(MqttMessage message) { @@ -877,11 +872,4 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional deviceProfileOpt) { deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); } - - @Data - public static class RequestInfo { - private final int msgId; - private final long requestTime; - private final TransportProtos.SessionInfoProto sessionInfo; - } }