From a5cfe079996e53d38b3a57038c7013915878a00f Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 17 Apr 2023 16:36:33 +0300 Subject: [PATCH] Edge - improved logic of interruption of sending downlink tasks --- .../service/edge/rpc/EdgeGrpcService.java | 2 +- .../service/edge/rpc/EdgeGrpcSession.java | 58 +++++++++---------- .../service/edge/rpc/EdgeSessionState.java | 2 +- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index f273517b74..5171a6037f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -288,7 +288,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i if (session != null) { boolean success = false; if (session.isConnected()) { - session.startSyncProcess(tenantId, edgeId, true); + session.startSyncProcess(true); success = true; } clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success)); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 9e3a22f64f..33aed92682 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -148,7 +148,7 @@ public final class EdgeGrpcSession implements Closeable { if (requestMsg.getSyncRequestMsg().hasFullSync()) { fullSync = requestMsg.getSyncRequestMsg().getFullSync(); } - startSyncProcess(edge.getTenantId(), edge.getId(), fullSync); + startSyncProcess(fullSync); } else { syncCompleted = true; } @@ -192,10 +192,10 @@ public final class EdgeGrpcSession implements Closeable { }; } - public void startSyncProcess(TenantId tenantId, EdgeId edgeId, boolean fullSync) { - log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId); + public void startSyncProcess(boolean fullSync) { + log.trace("[{}][{}][{}] Staring edge sync process", edge.getTenantId(), edge.getId(), this.sessionId); syncCompleted = false; - interruptGeneralProcessingOnSync(tenantId, edgeId); + interruptGeneralProcessingOnSync(); doSync(new EdgeSyncCursor(ctx, edge, fullSync)); } @@ -221,9 +221,9 @@ public final class EdgeGrpcSession implements Closeable { .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) .setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build()) .build(); - Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback() { + Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback<>() { @Override - public void onSuccess(Void result) { + public void onSuccess(Boolean isInterrupted) { syncCompleted = true; ctx.getClusterService().onEdgeEventUpdate(edge.getTenantId(), edge.getId()); } @@ -272,7 +272,7 @@ public final class EdgeGrpcSession implements Closeable { } if (sessionState.getPendingMsgsMap().isEmpty()) { log.debug("[{}] Pending msgs map is empty. Stopping current iteration", edge.getRoutingKey()); - stopCurrentSendDownlinkMsgsTask(null); + stopCurrentSendDownlinkMsgsTask(false); } } catch (Exception e) { log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e); @@ -367,14 +367,19 @@ public final class EdgeGrpcSession implements Closeable { if (isConnected() && !pageData.getData().isEmpty()) { log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size()); List downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData()); - Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback() { + Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() { @Override - public void onSuccess(@Nullable Void tmp) { - if (isConnected() && pageData.hasNext()) { - processEdgeEvents(fetcher, pageLink.nextPageLink(), result); + public void onSuccess(@Nullable Boolean isInterrupted) { + if (Boolean.TRUE.equals(isInterrupted)) { + log.debug("[{}][{}][{}] Send downlink messages task was interrupted", edge.getTenantId(), edge.getId(), sessionId); + result.set(null); } else { - UUID ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId(); - result.set(ifOffset); + if (isConnected() && pageData.hasNext()) { + processEdgeEvents(fetcher, pageLink.nextPageLink(), result); + } else { + UUID ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId(); + result.set(ifOffset); + } } } @@ -394,7 +399,7 @@ public final class EdgeGrpcSession implements Closeable { } } - private ListenableFuture sendDownlinkMsgsPack(List downlinkMsgsPack) { + private ListenableFuture sendDownlinkMsgsPack(List downlinkMsgsPack) { interruptPreviousSendDownlinkMsgsTask(); sessionState.setSendDownlinkMsgsFuture(SettableFuture.create()); @@ -433,14 +438,14 @@ public final class EdgeGrpcSession implements Closeable { } else { log.warn("[{}] Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}", this.sessionId, MAX_DOWNLINK_ATTEMPTS, copy); - stopCurrentSendDownlinkMsgsTask(null); + stopCurrentSendDownlinkMsgsTask(false); } } else { - stopCurrentSendDownlinkMsgsTask(null); + stopCurrentSendDownlinkMsgsTask(false); } } catch (Exception e) { log.warn("[{}] Failed to send downlink msgs. Error msg {}", this.sessionId, e.getMessage(), e); - stopCurrentSendDownlinkMsgsTask(e); + stopCurrentSendDownlinkMsgsTask(true); } }; @@ -688,23 +693,18 @@ public final class EdgeGrpcSession implements Closeable { } private void interruptPreviousSendDownlinkMsgsTask() { - String msg = String.format("[%s] Previous send downlink future was not properly completed, stopping it now!", this.sessionId); - stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg)); + log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", edge.getTenantId(), edge.getId(), this.sessionId); + stopCurrentSendDownlinkMsgsTask(true); } - private void interruptGeneralProcessingOnSync(TenantId tenantId, EdgeId edgeId) { - String msg = String.format("[%s][%s] Sync process started. General processing interrupted!", tenantId, edgeId); - stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg)); + private void interruptGeneralProcessingOnSync() { + log.debug("[{}][{}][{}] Sync process started. General processing interrupted!", edge.getTenantId(), edge.getId(), this.sessionId); + stopCurrentSendDownlinkMsgsTask(true); } - public void stopCurrentSendDownlinkMsgsTask(Exception e) { + public void stopCurrentSendDownlinkMsgsTask(Boolean isInterrupted) { if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone()) { - if (e != null) { - log.debug(e.getMessage()); - sessionState.getSendDownlinkMsgsFuture().setException(e); - } else { - sessionState.getSendDownlinkMsgsFuture().set(null); - } + sessionState.getSendDownlinkMsgsFuture().set(isInterrupted); } if (sessionState.getScheduledSendDownlinkTask() != null) { sessionState.getScheduledSendDownlinkTask().cancel(true); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSessionState.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSessionState.java index 59910c829f..09a7c5add8 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSessionState.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSessionState.java @@ -28,6 +28,6 @@ import java.util.concurrent.ScheduledFuture; public class EdgeSessionState { private final Map pendingMsgsMap = Collections.synchronizedMap(new LinkedHashMap<>()); - private SettableFuture sendDownlinkMsgsFuture; + private SettableFuture sendDownlinkMsgsFuture; private ScheduledFuture scheduledSendDownlinkTask; }