Edge - improved logic of interruption of sending downlink tasks

This commit is contained in:
Volodymyr Babak 2023-04-17 16:36:33 +03:00
parent cbbbfe2388
commit a5cfe07999
3 changed files with 31 additions and 31 deletions

View File

@ -288,7 +288,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
if (session != null) {
boolean success = false;
if (session.isConnected()) {
session.startSyncProcess(tenantId, edgeId, true);
session.startSyncProcess(true);
success = true;
}
clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success));

View File

@ -148,7 +148,7 @@ public final class EdgeGrpcSession implements Closeable {
if (requestMsg.getSyncRequestMsg().hasFullSync()) {
fullSync = requestMsg.getSyncRequestMsg().getFullSync();
}
startSyncProcess(edge.getTenantId(), edge.getId(), fullSync);
startSyncProcess(fullSync);
} else {
syncCompleted = true;
}
@ -192,10 +192,10 @@ public final class EdgeGrpcSession implements Closeable {
};
}
public void startSyncProcess(TenantId tenantId, EdgeId edgeId, boolean fullSync) {
log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId);
public void startSyncProcess(boolean fullSync) {
log.trace("[{}][{}][{}] Staring edge sync process", edge.getTenantId(), edge.getId(), this.sessionId);
syncCompleted = false;
interruptGeneralProcessingOnSync(tenantId, edgeId);
interruptGeneralProcessingOnSync();
doSync(new EdgeSyncCursor(ctx, edge, fullSync));
}
@ -221,9 +221,9 @@ public final class EdgeGrpcSession implements Closeable {
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build())
.build();
Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback<Void>() {
Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback<>() {
@Override
public void onSuccess(Void result) {
public void onSuccess(Boolean isInterrupted) {
syncCompleted = true;
ctx.getClusterService().onEdgeEventUpdate(edge.getTenantId(), edge.getId());
}
@ -272,7 +272,7 @@ public final class EdgeGrpcSession implements Closeable {
}
if (sessionState.getPendingMsgsMap().isEmpty()) {
log.debug("[{}] Pending msgs map is empty. Stopping current iteration", edge.getRoutingKey());
stopCurrentSendDownlinkMsgsTask(null);
stopCurrentSendDownlinkMsgsTask(false);
}
} catch (Exception e) {
log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e);
@ -367,14 +367,19 @@ public final class EdgeGrpcSession implements Closeable {
if (isConnected() && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData());
Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<Void>() {
Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {
if (isConnected() && pageData.hasNext()) {
processEdgeEvents(fetcher, pageLink.nextPageLink(), result);
public void onSuccess(@Nullable Boolean isInterrupted) {
if (Boolean.TRUE.equals(isInterrupted)) {
log.debug("[{}][{}][{}] Send downlink messages task was interrupted", edge.getTenantId(), edge.getId(), sessionId);
result.set(null);
} else {
UUID ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId();
result.set(ifOffset);
if (isConnected() && pageData.hasNext()) {
processEdgeEvents(fetcher, pageLink.nextPageLink(), result);
} else {
UUID ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId();
result.set(ifOffset);
}
}
}
@ -394,7 +399,7 @@ public final class EdgeGrpcSession implements Closeable {
}
}
private ListenableFuture<Void> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) {
private ListenableFuture<Boolean> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) {
interruptPreviousSendDownlinkMsgsTask();
sessionState.setSendDownlinkMsgsFuture(SettableFuture.create());
@ -433,14 +438,14 @@ public final class EdgeGrpcSession implements Closeable {
} else {
log.warn("[{}] Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}",
this.sessionId, MAX_DOWNLINK_ATTEMPTS, copy);
stopCurrentSendDownlinkMsgsTask(null);
stopCurrentSendDownlinkMsgsTask(false);
}
} else {
stopCurrentSendDownlinkMsgsTask(null);
stopCurrentSendDownlinkMsgsTask(false);
}
} catch (Exception e) {
log.warn("[{}] Failed to send downlink msgs. Error msg {}", this.sessionId, e.getMessage(), e);
stopCurrentSendDownlinkMsgsTask(e);
stopCurrentSendDownlinkMsgsTask(true);
}
};
@ -688,23 +693,18 @@ public final class EdgeGrpcSession implements Closeable {
}
private void interruptPreviousSendDownlinkMsgsTask() {
String msg = String.format("[%s] Previous send downlink future was not properly completed, stopping it now!", this.sessionId);
stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg));
log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", edge.getTenantId(), edge.getId(), this.sessionId);
stopCurrentSendDownlinkMsgsTask(true);
}
private void interruptGeneralProcessingOnSync(TenantId tenantId, EdgeId edgeId) {
String msg = String.format("[%s][%s] Sync process started. General processing interrupted!", tenantId, edgeId);
stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg));
private void interruptGeneralProcessingOnSync() {
log.debug("[{}][{}][{}] Sync process started. General processing interrupted!", edge.getTenantId(), edge.getId(), this.sessionId);
stopCurrentSendDownlinkMsgsTask(true);
}
public void stopCurrentSendDownlinkMsgsTask(Exception e) {
public void stopCurrentSendDownlinkMsgsTask(Boolean isInterrupted) {
if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone()) {
if (e != null) {
log.debug(e.getMessage());
sessionState.getSendDownlinkMsgsFuture().setException(e);
} else {
sessionState.getSendDownlinkMsgsFuture().set(null);
}
sessionState.getSendDownlinkMsgsFuture().set(isInterrupted);
}
if (sessionState.getScheduledSendDownlinkTask() != null) {
sessionState.getScheduledSendDownlinkTask().cancel(true);

View File

@ -28,6 +28,6 @@ import java.util.concurrent.ScheduledFuture;
public class EdgeSessionState {
private final Map<Integer, DownlinkMsg> pendingMsgsMap = Collections.synchronizedMap(new LinkedHashMap<>());
private SettableFuture<Void> sendDownlinkMsgsFuture;
private SettableFuture<Boolean> sendDownlinkMsgsFuture;
private ScheduledFuture<?> scheduledSendDownlinkTask;
}