Merge pull request #12215 from AndriiLandiak/fix/migration-edge-events

Fix edge events migration
This commit is contained in:
Viacheslav Klimov 2024-12-09 15:13:52 +02:00 committed by GitHub
commit ad09575e8f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -413,23 +413,27 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
if (Boolean.TRUE.equals(sessionNewEvents.get(edgeId))) { if (Boolean.TRUE.equals(sessionNewEvents.get(edgeId))) {
log.trace("[{}][{}] Set session new events flag to false", tenantId, edgeId.getId()); log.trace("[{}][{}] Set session new events flag to false", tenantId, edgeId.getId());
sessionNewEvents.put(edgeId, false); sessionNewEvents.put(edgeId, false);
processEdgeEventMigrationIfNeeded(session, edgeId);
session.processHighPriorityEvents(); session.processHighPriorityEvents();
Futures.addCallback(session.processEdgeEvents(), new FutureCallback<>() { processEdgeEventMigrationIfNeeded(session, edgeId);
@Override if (Boolean.TRUE.equals(edgeEventsMigrationProcessed.get(edgeId))) {
public void onSuccess(Boolean newEventsAdded) { Futures.addCallback(session.processEdgeEvents(), new FutureCallback<>() {
if (Boolean.TRUE.equals(newEventsAdded)) { @Override
sessionNewEvents.put(edgeId, true); public void onSuccess(Boolean newEventsAdded) {
if (Boolean.TRUE.equals(newEventsAdded)) {
sessionNewEvents.put(edgeId, true);
}
scheduleEdgeEventsCheck(session);
} }
scheduleEdgeEventsCheck(session);
}
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.warn("[{}] Failed to process edge events for edge [{}]!", tenantId, session.getEdge().getId().getId(), t); log.warn("[{}] Failed to process edge events for edge [{}]!", tenantId, session.getEdge().getId().getId(), t);
scheduleEdgeEventsCheck(session); scheduleEdgeEventsCheck(session);
} }
}, ctx.getGrpcCallbackExecutorService()); }, ctx.getGrpcCallbackExecutorService());
} else {
scheduleEdgeEventsCheck(session);
}
} else { } else {
scheduleEdgeEventsCheck(session); scheduleEdgeEventsCheck(session);
} }
@ -457,8 +461,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
scheduleEdgeEventsCheck(session); scheduleEdgeEventsCheck(session);
} else if (Boolean.FALSE.equals(eventsExist)) { } else if (Boolean.FALSE.equals(eventsExist)) {
edgeEventsMigrationProcessed.put(edgeId, true); edgeEventsMigrationProcessed.put(edgeId, true);
} else {
scheduleEdgeEventsCheck(session);
} }
} }
} }