Merge pull request #8378 from volodymyr-babak/edge-interruption-improved
Edge - improved logic of interruption of sending downlink tasks
This commit is contained in:
commit
fad706b412
@ -288,7 +288,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
if (session != null) {
|
||||
boolean success = false;
|
||||
if (session.isConnected()) {
|
||||
session.startSyncProcess(tenantId, edgeId, true);
|
||||
session.startSyncProcess(true);
|
||||
success = true;
|
||||
}
|
||||
clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success));
|
||||
|
||||
@ -148,7 +148,7 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
if (requestMsg.getSyncRequestMsg().hasFullSync()) {
|
||||
fullSync = requestMsg.getSyncRequestMsg().getFullSync();
|
||||
}
|
||||
startSyncProcess(edge.getTenantId(), edge.getId(), fullSync);
|
||||
startSyncProcess(fullSync);
|
||||
} else {
|
||||
syncCompleted = true;
|
||||
}
|
||||
@ -192,10 +192,10 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
};
|
||||
}
|
||||
|
||||
public void startSyncProcess(TenantId tenantId, EdgeId edgeId, boolean fullSync) {
|
||||
log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId);
|
||||
public void startSyncProcess(boolean fullSync) {
|
||||
log.trace("[{}][{}][{}] Staring edge sync process", edge.getTenantId(), edge.getId(), this.sessionId);
|
||||
syncCompleted = false;
|
||||
interruptGeneralProcessingOnSync(tenantId, edgeId);
|
||||
interruptGeneralProcessingOnSync();
|
||||
doSync(new EdgeSyncCursor(ctx, edge, fullSync));
|
||||
}
|
||||
|
||||
@ -221,9 +221,9 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||
.setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build())
|
||||
.build();
|
||||
Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback<Void>() {
|
||||
Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(Void result) {
|
||||
public void onSuccess(Boolean isInterrupted) {
|
||||
syncCompleted = true;
|
||||
ctx.getClusterService().onEdgeEventUpdate(edge.getTenantId(), edge.getId());
|
||||
}
|
||||
@ -272,7 +272,7 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
}
|
||||
if (sessionState.getPendingMsgsMap().isEmpty()) {
|
||||
log.debug("[{}] Pending msgs map is empty. Stopping current iteration", edge.getRoutingKey());
|
||||
stopCurrentSendDownlinkMsgsTask(null);
|
||||
stopCurrentSendDownlinkMsgsTask(false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e);
|
||||
@ -367,9 +367,13 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
if (isConnected() && !pageData.getData().isEmpty()) {
|
||||
log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());
|
||||
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData());
|
||||
Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<Void>() {
|
||||
Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Void tmp) {
|
||||
public void onSuccess(@Nullable Boolean isInterrupted) {
|
||||
if (Boolean.TRUE.equals(isInterrupted)) {
|
||||
log.debug("[{}][{}][{}] Send downlink messages task was interrupted", edge.getTenantId(), edge.getId(), sessionId);
|
||||
result.set(null);
|
||||
} else {
|
||||
if (isConnected() && pageData.hasNext()) {
|
||||
processEdgeEvents(fetcher, pageLink.nextPageLink(), result);
|
||||
} else {
|
||||
@ -377,6 +381,7 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
result.set(ifOffset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
@ -394,7 +399,7 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
}
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) {
|
||||
private ListenableFuture<Boolean> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) {
|
||||
interruptPreviousSendDownlinkMsgsTask();
|
||||
|
||||
sessionState.setSendDownlinkMsgsFuture(SettableFuture.create());
|
||||
@ -433,14 +438,14 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
} else {
|
||||
log.warn("[{}] Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}",
|
||||
this.sessionId, MAX_DOWNLINK_ATTEMPTS, copy);
|
||||
stopCurrentSendDownlinkMsgsTask(null);
|
||||
stopCurrentSendDownlinkMsgsTask(false);
|
||||
}
|
||||
} else {
|
||||
stopCurrentSendDownlinkMsgsTask(null);
|
||||
stopCurrentSendDownlinkMsgsTask(false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] Failed to send downlink msgs. Error msg {}", this.sessionId, e.getMessage(), e);
|
||||
stopCurrentSendDownlinkMsgsTask(e);
|
||||
stopCurrentSendDownlinkMsgsTask(true);
|
||||
}
|
||||
};
|
||||
|
||||
@ -688,23 +693,18 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
}
|
||||
|
||||
private void interruptPreviousSendDownlinkMsgsTask() {
|
||||
String msg = String.format("[%s] Previous send downlink future was not properly completed, stopping it now!", this.sessionId);
|
||||
stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg));
|
||||
log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", edge.getTenantId(), edge.getId(), this.sessionId);
|
||||
stopCurrentSendDownlinkMsgsTask(true);
|
||||
}
|
||||
|
||||
private void interruptGeneralProcessingOnSync(TenantId tenantId, EdgeId edgeId) {
|
||||
String msg = String.format("[%s][%s] Sync process started. General processing interrupted!", tenantId, edgeId);
|
||||
stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg));
|
||||
private void interruptGeneralProcessingOnSync() {
|
||||
log.debug("[{}][{}][{}] Sync process started. General processing interrupted!", edge.getTenantId(), edge.getId(), this.sessionId);
|
||||
stopCurrentSendDownlinkMsgsTask(true);
|
||||
}
|
||||
|
||||
public void stopCurrentSendDownlinkMsgsTask(Exception e) {
|
||||
public void stopCurrentSendDownlinkMsgsTask(Boolean isInterrupted) {
|
||||
if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone()) {
|
||||
if (e != null) {
|
||||
log.debug(e.getMessage());
|
||||
sessionState.getSendDownlinkMsgsFuture().setException(e);
|
||||
} else {
|
||||
sessionState.getSendDownlinkMsgsFuture().set(null);
|
||||
}
|
||||
sessionState.getSendDownlinkMsgsFuture().set(isInterrupted);
|
||||
}
|
||||
if (sessionState.getScheduledSendDownlinkTask() != null) {
|
||||
sessionState.getScheduledSendDownlinkTask().cancel(true);
|
||||
|
||||
@ -28,6 +28,6 @@ import java.util.concurrent.ScheduledFuture;
|
||||
public class EdgeSessionState {
|
||||
|
||||
private final Map<Integer, DownlinkMsg> pendingMsgsMap = Collections.synchronizedMap(new LinkedHashMap<>());
|
||||
private SettableFuture<Void> sendDownlinkMsgsFuture;
|
||||
private SettableFuture<Boolean> sendDownlinkMsgsFuture;
|
||||
private ScheduledFuture<?> scheduledSendDownlinkTask;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user