diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 5516762783..c2b5b863fe 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -400,6 +400,14 @@ public class ActorSystemContext { @Getter private String debugPerTenantLimitsConfiguration; + @Value("${actors.rpc.sequence.enabled:true}") + @Getter + private boolean rpcSequenceEnabled; + + @Value("${actors.rpc.persistent.retries:5}") + @Getter + private int maxPersistentRpcRetries; + @Getter @Setter private TbActorSystem actorSystem; diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index eb635b69c1..0fa55083cd 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -121,6 +121,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { private final Map attributeSubscriptions; private final Map rpcSubscriptions; private final Map toDeviceRpcPendingMap; + private final boolean rpcSequenceEnabled; private int rpcSeq = 0; private String deviceName; @@ -132,6 +133,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { super(systemContext); this.tenantId = tenantId; this.deviceId = deviceId; + this.rpcSequenceEnabled = systemContext.isRpcSequenceEnabled(); this.attributeSubscriptions = new HashMap<>(); this.rpcSubscriptions = new HashMap<>(); this.toDeviceRpcPendingMap = new LinkedHashMap<>(); @@ -185,19 +187,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { if (timeout <= 0) { log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime()); if (persisted) { - createRpc(request, RpcStatus.TIMEOUT); + createRpc(request, RpcStatus.EXPIRED); } return; } else if (persisted) { createRpc(request, RpcStatus.QUEUED); } - boolean sent; + boolean sent = false; if (systemContext.isEdgesEnabled() && edgeId != null) { log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId()); saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId()); sent = true; - } else { + } else if (!rpcSequenceEnabled || toDeviceRpcPendingMap.isEmpty()) { sent = rpcSubscriptions.size() > 0; Set syncSessionSet = new HashSet<>(); rpcSubscriptions.forEach((key, value) -> { @@ -292,7 +294,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { if (requestMd != null) { log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); if (requestMd.getMsg().getMsg().isPersisted()) { - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null); + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.EXPIRED, null); } systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); @@ -300,7 +302,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } - private void sendPendingRequest(TbActorCtx context, UUID sessionId, String nodeId) { + private void sendPendingRequests(TbActorCtx context, UUID sessionId, String nodeId) { SessionType sessionType = getSessionType(sessionId); if (!toDeviceRpcPendingMap.isEmpty()) { log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId); @@ -312,11 +314,34 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId); } Set sentOneWayIds = new HashSet<>(); - toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds)); + + if (sessionType == SessionType.ASYNC) { + if (rpcSequenceEnabled) { + List> entries = new ArrayList<>(); + for (Map.Entry entry : toDeviceRpcPendingMap.entrySet()) { + if (entry.getValue().isDelivered()) { + continue; + } + entries.add(entry); + if (entry.getValue().getMsg().getMsg().isPersisted() || entry.getValue().getMsg().getMsg().isOneway()) { + break; + } + } + entries.forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds)); + } else { + toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds)); + } + } else { + toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds)); + } + + sentOneWayIds.stream().filter(id -> !toDeviceRpcPendingMap.get(id).getMsg().getMsg().isPersisted()).forEach(toDeviceRpcPendingMap::remove); } private void sendNextPendingRequest(TbActorCtx context) { - rpcSubscriptions.forEach((id, s) -> sendPendingRequest(context, id, s.getNodeId())); + if (rpcSequenceEnabled) { + rpcSubscriptions.forEach((id, s) -> sendPendingRequests(context, id, s.getNodeId())); + } } private Consumer> processPendingRpc(TbActorCtx context, UUID sessionId, String nodeId, Set sentOneWayIds) { @@ -338,11 +363,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .setPersisted(request.isPersisted()) .build(); sendToTransport(rpcRequest, sessionId, nodeId); - - if (SessionType.ASYNC.equals(getSessionType(sessionId)) && request.isOneway() && !request.isPersisted()) { - toDeviceRpcPendingMap.remove(entry.getKey()); - sendPendingRequest(context, sessionId, nodeId); - } }; } @@ -361,7 +381,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC()); } if (msg.hasSendPendingRPC()) { - sendPendingRequest(context, getSessionId(sessionInfo), sessionInfo.getNodeId()); + sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo.getNodeId()); } if (msg.hasGetAttributes()) { handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes()); @@ -559,16 +579,28 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg responseMsg) { UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB()); RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus()); - - ToDeviceRpcRequestMetadata md; - if (RpcStatus.DELIVERED.equals(status)) { - md = toDeviceRpcPendingMap.get(responseMsg.getRequestId()); - } else { - md = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); - } + ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(responseMsg.getRequestId()); if (md != null) { + if (status.equals(RpcStatus.DELIVERED)) { + if (md.getMsg().getMsg().isOneway()) { + toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); + } else { + md.setDelivered(true); + } + } else if (status.equals(RpcStatus.TIMEOUT)) { + if (systemContext.getMaxPersistentRpcRetries() <= md.getRetries()) { + toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); + status = RpcStatus.FAILED; + } else { + md.setRetries(md.getRetries() + 1); + } + } + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, null); + if (status != RpcStatus.SENT) { + sendNextPendingRequest(context); + } } else { log.info("[{}][{}] Rpc has already removed from pending map.", deviceId, rpcId); } @@ -608,7 +640,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { sessionMD.setSubscribedToRPC(true); log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId); rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo()); - sendPendingRequest(context, sessionId, sessionInfo.getNodeId()); + sendPendingRequests(context, sessionId, sessionInfo.getNodeId()); dumpSessions(); } } @@ -884,7 +916,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class); long timeout = rpc.getExpirationTime() - System.currentTimeMillis(); if (timeout <= 0) { - rpc.setStatus(RpcStatus.TIMEOUT); + rpc.setStatus(RpcStatus.EXPIRED); systemContext.getTbRpcService().save(tenantId, rpc); } else { registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout); diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java index 44a2e0f3de..2b10b0cba0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java @@ -25,4 +25,6 @@ import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; public class ToDeviceRpcRequestMetadata { private final ToDeviceRpcRequestActorMsg msg; private final boolean sent; + private int retries; + private boolean delivered; } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 74572e8ccc..67260e4d33 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -326,6 +326,11 @@ actors: queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:15000}" # Time in milliseconds for transaction to complete duration: "${ACTORS_RULE_TRANSACTION_DURATION:60000}" + rpc: + persistent: + retries: "${ACTORS_RPC_PERSISTENT_RETRIES:5}" + sequence: + enabled: "${ACTORS_RPC_SEQUENCE_ENABLED:true}" statistics: # Enable/disable actor statistics enabled: "${ACTORS_STATISTICS_ENABLED:true}" diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java b/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java index c80d0c5993..43592fde0c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java @@ -16,5 +16,5 @@ package org.thingsboard.server.common.data.rpc; public enum RpcStatus { - QUEUED, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED + QUEUED, SENT, DELIVERED, SUCCESSFUL, TIMEOUT, EXPIRED, FAILED } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java index 4f488b5d7b..90f4f63bd0 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java @@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadCo import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.SessionMsgListener; @@ -532,7 +533,7 @@ public class DefaultCoapClientContext implements CoapClientContext { response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); if (rpcRequestMsg != null) { - transportService.process(state.getSession(), rpcRequestMsg, TransportServiceCallback.EMPTY); + transportService.process(state.getSession(), rpcRequestMsg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); } }, null)); } @@ -553,8 +554,12 @@ public class DefaultCoapClientContext implements CoapClientContext { transportService.process(state.getSession(), TransportProtos.ToDeviceRpcResponseMsg.newBuilder() .setRequestId(msg.getRequestId()).setError(error).build(), TransportServiceCallback.EMPTY); - } else if (msg.getPersisted() && !conRequest && sent) { - transportService.process(state.getSession(), msg, TransportServiceCallback.EMPTY); + } else if (msg.getPersisted() && sent) { + if (conRequest) { + transportService.process(state.getSession(), msg, RpcStatus.SENT, TransportServiceCallback.EMPTY); + } else { + transportService.process(state.getSession(), msg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); + } } } } diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index aab76e350c..c7e44f0ee3 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -409,7 +409,7 @@ public class DeviceApiController implements TbTransportService { public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg msg) { log.trace("[{}] Received RPC command to device", sessionId); responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); - transportService.process(sessionInfo, msg, TransportServiceCallback.EMPTY); + transportService.process(sessionInfo, msg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); } @Override diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java index 7ac8026a4c..8135b5aae0 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java @@ -21,7 +21,9 @@ import org.eclipse.leshan.core.ResponseCode; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; @@ -158,6 +160,7 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { throw new IllegalArgumentException("Unsupported operation: " + operationType.name()); } } + transportService.process(client.getSession(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY); } catch (IllegalArgumentException e) { this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST, e.getMessage()); } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/RpcDownlinkRequestCallbackProxy.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/RpcDownlinkRequestCallbackProxy.java index c94c9c5805..a65ac4dfaf 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/RpcDownlinkRequestCallbackProxy.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/RpcDownlinkRequestCallbackProxy.java @@ -19,6 +19,7 @@ import org.eclipse.leshan.core.ResponseCode; import org.eclipse.leshan.core.request.exception.ClientSleepingException; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.gen.transport.TransportProtos; @@ -44,7 +45,7 @@ public abstract class RpcDownlinkRequestCallbackProxy implements DownlinkR @Override public void onSuccess(R request, T response) { - transportService.process(client.getSession(), this.request, TransportServiceCallback.EMPTY); + transportService.process(client.getSession(), this.request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); sendRpcReplyOnSuccess(response); if (callback != null) { callback.onSuccess(request, response); @@ -61,7 +62,9 @@ public abstract class RpcDownlinkRequestCallbackProxy implements DownlinkR @Override public void onError(String params, Exception e) { - if (!(e instanceof TimeoutException || e instanceof ClientSleepingException)) { + if (e instanceof TimeoutException) { + transportService.process(client.getSession(), this.request, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); + } else if (!(e instanceof ClientSleepingException)) { sendRpcReplyOnError(e); } if (callback != null) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index fa8c2599eb..430ce2cfbb 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -50,6 +50,7 @@ import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.data.id.OtaPackageId; import org.thingsboard.server.common.data.ota.OtaPackageType; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.transport.SessionMsgListener; @@ -272,7 +273,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId(); TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId); if (rpcRequest != null) { - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, TransportServiceCallback.EMPTY); + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); } break; default: @@ -856,10 +857,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement }, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); } var cf = publish(payload, deviceSessionCtx); - if (rpcRequest.getPersisted() && !isAckExpected(payload)) { + if (rpcRequest.getPersisted()) { cf.addListener(result -> { if (result.cause() == null) { - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, TransportServiceCallback.EMPTY); + if (isAckExpected(payload)) { + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY); + } else { + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); + } } }); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index 21086d75d8..2713a36bb8 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -16,8 +16,10 @@ package org.thingsboard.server.transport.mqtt.session; import io.netty.channel.ChannelFuture; +import io.netty.handler.codec.mqtt.MqttMessage; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; @@ -102,9 +104,13 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple payload -> { ChannelFuture channelFuture = parent.writeAndFlush(payload); if (request.getPersisted()) { - channelFuture.addListener(future -> { - if (future.cause() == null) { - transportService.process(getSessionInfo(), request, TransportServiceCallback.EMPTY); + channelFuture.addListener(result -> { + if (result.cause() == null) { + if (isAckExpected(payload)) { + transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY); + } else { + transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); + } } }); } @@ -129,4 +135,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple // This feature is not supported in the TB IoT Gateway yet. } + private boolean isAckExpected(MqttMessage message) { + return message.fixedHeader().qosLevel().value() > 0; + } + } diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java index 1927aadc56..6d9238d6bc 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration; import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; @@ -142,7 +143,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) { log.trace("[{}] Received RPC command to device", sessionId); snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest); - snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, TransportServiceCallback.EMPTY); + snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); } @Override diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 237954c553..c5657260d7 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.common.transport; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.common.transport.service.SessionMetaData; @@ -112,7 +113,7 @@ public interface TransportService { void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback); - void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, TransportServiceCallback callback); + void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback callback); void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback callback); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index c705d8364a..be87436bd3 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -580,15 +580,13 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, TransportServiceCallback callback) { + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback callback) { if (msg.getPersisted()) { - RpcStatus status = msg.getOneway() ? RpcStatus.SUCCESSFUL : RpcStatus.DELIVERED; - TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder() .setRequestId(msg.getRequestId()) .setRequestIdLSB(msg.getRequestIdLSB()) .setRequestIdMSB(msg.getRequestIdMSB()) - .setStatus(status.name()) + .setStatus(rpcStatus.name()) .build(); if (checkLimits(sessionInfo, responseMsg, callback)) {