EdgeGrpcSession - move check of empty high priority msgs into submethod

This commit is contained in:
Volodymyr Babak 2024-12-23 20:46:07 +02:00
parent 509ff28f2e
commit d5e973283c

View File

@ -292,9 +292,7 @@ public abstract class EdgeGrpcSession implements Closeable {
protected void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture<Pair<Long, Long>> result) {
try {
if (!highPriorityQueue.isEmpty()) {
processHighPriorityEvents();
}
processHighPriorityEvents();
PageData<EdgeEvent> pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink);
if (isConnected() && !pageData.getData().isEmpty()) {
log.trace("[{}][{}][{}] event(s) are going to be processed.", tenantId, sessionId, pageData.getData().size());
@ -537,6 +535,9 @@ public abstract class EdgeGrpcSession implements Closeable {
public void processHighPriorityEvents() {
try {
if (isConnected() && isSyncCompleted()) {
if (highPriorityQueue.isEmpty()) {
return;
}
List<EdgeEvent> highPriorityEvents = new ArrayList<>();
EdgeEvent event;
while ((event = highPriorityQueue.poll()) != null) {
@ -606,8 +607,13 @@ public abstract class EdgeGrpcSession implements Closeable {
}
}, ctx.getGrpcCallbackExecutorService());
} else {
log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", tenantId, sessionId);
result.set(null);
if (!isSyncCompleted()) {
log.trace("[{}][{}] edge sync is not completed yet. Skipping iteration", tenantId, sessionId);
result.set(Boolean.TRUE);
} else {
log.trace("[{}][{}] edge is not connected. Skipping iteration", tenantId, sessionId);
result.set(null);
}
}
return result;
}