Fix migration from postgres to kafka for edge-events

This commit is contained in:
Andrii Landiak 2024-12-09 15:08:48 +02:00
parent ccddca23a1
commit fcb32c29f9

View File

@ -413,8 +413,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
if (Boolean.TRUE.equals(sessionNewEvents.get(edgeId))) {
log.trace("[{}][{}] Set session new events flag to false", tenantId, edgeId.getId());
sessionNewEvents.put(edgeId, false);
processEdgeEventMigrationIfNeeded(session, edgeId);
session.processHighPriorityEvents();
processEdgeEventMigrationIfNeeded(session, edgeId);
if (Boolean.TRUE.equals(edgeEventsMigrationProcessed.get(edgeId))) {
Futures.addCallback(session.processEdgeEvents(), new FutureCallback<>() {
@Override
public void onSuccess(Boolean newEventsAdded) {
@ -433,6 +434,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
} else {
scheduleEdgeEventsCheck(session);
}
} else {
scheduleEdgeEventsCheck(session);
}
} finally {
newEventLock.unlock();
}
@ -457,8 +461,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
scheduleEdgeEventsCheck(session);
} else if (Boolean.FALSE.equals(eventsExist)) {
edgeEventsMigrationProcessed.put(edgeId, true);
} else {
scheduleEdgeEventsCheck(session);
}
}
}