diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java index ff587f50c5..9534be8516 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java @@ -117,6 +117,10 @@ public class LwM2mClient implements Serializable { @Getter private final AtomicInteger retryAttempts; + @Getter + @Setter + private UUID lastSentRpcId; + public Object clone() throws CloneNotSupportedException { return super.clone(); } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java index 681ae6e8f0..2ec0122c28 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java @@ -319,6 +319,10 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im Registration registration = client.getRegistration(); try { logService.log(client, String.format("[%s][%s] Sending request: %s to %s", registration.getId(), registration.getSocketAddress(), request.getClass().getSimpleName(), pathToStringFunction.apply(request))); + if (!callback.onSent(request)) { + return; + } + context.getServer().send(registration, request, timeoutInMs, response -> { executor.submit(() -> { try { @@ -330,7 +334,6 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im } }); }, e -> handleDownlinkError(client, request, callback, e)); - callback.onSent(request); } catch (Exception e) { handleDownlinkError(client, request, callback, e); } @@ -366,6 +369,7 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im private , T extends LwM2mResponse> void handleDownlinkError(LwM2mClient client, R request, DownlinkRequestCallback callback, Exception e) { log.trace("[{}] Received downlink error: {}.", client.getEndpoint(), e); + client.updateLastUplinkTime(); executor.submit(() -> { if (e instanceof TimeoutException || e instanceof ClientSleepingException) { log.trace("[{}] Received {}, client is probably sleeping", client.getEndpoint(), e.getClass().getSimpleName()); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DownlinkRequestCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DownlinkRequestCallback.java index 2dc99479c1..8912c51960 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DownlinkRequestCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DownlinkRequestCallback.java @@ -17,7 +17,9 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; public interface DownlinkRequestCallback { - default void onSent(R request){}; + default boolean onSent(R request){ + return true; + }; void onSuccess(R request, T response); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java index bdc5dcc819..118961850c 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java @@ -95,6 +95,12 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty"); return; } + UUID rpcId = new UUID(rpcRequest.getRequestIdMSB(), rpcRequest.getRequestIdLSB()); + + if (rpcId.equals(client.getLastSentRpcId())) { + log.debug("[{}]][{}] Rpc has already sent!", client.getEndpoint(), rpcId); + return; + } try { if (operationType.isHasObjectId()) { String objectId = getIdFromParameters(client, rpcRequest); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/RpcDownlinkRequestCallbackProxy.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/RpcDownlinkRequestCallbackProxy.java index 5361682491..6221a68149 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/RpcDownlinkRequestCallbackProxy.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/RpcDownlinkRequestCallbackProxy.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.transport.lwm2m.server.rpc; +import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.ResponseCode; import org.eclipse.leshan.core.request.exception.ClientSleepingException; import org.thingsboard.common.util.JacksonUtil; @@ -26,8 +27,10 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback; +import java.util.UUID; import java.util.concurrent.TimeoutException; +@Slf4j public abstract class RpcDownlinkRequestCallbackProxy implements DownlinkRequestCallback { private final TransportService transportService; @@ -44,8 +47,20 @@ public abstract class RpcDownlinkRequestCallbackProxy implements DownlinkR } @Override - public void onSent(R request) { + public boolean onSent(R request) { + client.lock(); + try { + UUID rpcId = new UUID(this.request.getRequestIdMSB(), this.request.getRequestIdLSB()); + if (rpcId.equals(client.getLastSentRpcId())) { + log.debug("[{}]][{}] Rpc has already sent!", client.getEndpoint(), rpcId); + return false; + } + client.setLastSentRpcId(rpcId); + } finally { + client.unlock(); + } transportService.process(client.getSession(), this.request, RpcStatus.SENT, TransportServiceCallback.EMPTY); + return true; } @Override @@ -68,6 +83,7 @@ public abstract class RpcDownlinkRequestCallbackProxy implements DownlinkR @Override public void onError(String params, Exception e) { if (e instanceof TimeoutException || e instanceof org.eclipse.leshan.core.request.exception.TimeoutException) { + client.setLastSentRpcId(null); transportService.process(client.getSession(), this.request, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); } else if (!(e instanceof ClientSleepingException)) { sendRpcReplyOnError(e);