From fcb32c29f912e2a62f3a01528c4de6a500b3feb2 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Mon, 9 Dec 2024 15:08:48 +0200 Subject: [PATCH] Fix migration from postgres to kafka for edge-events --- .../service/edge/rpc/EdgeGrpcService.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 71387aa542..2e9465fc9d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -413,23 +413,27 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i if (Boolean.TRUE.equals(sessionNewEvents.get(edgeId))) { log.trace("[{}][{}] Set session new events flag to false", tenantId, edgeId.getId()); sessionNewEvents.put(edgeId, false); - processEdgeEventMigrationIfNeeded(session, edgeId); session.processHighPriorityEvents(); - Futures.addCallback(session.processEdgeEvents(), new FutureCallback<>() { - @Override - public void onSuccess(Boolean newEventsAdded) { - if (Boolean.TRUE.equals(newEventsAdded)) { - sessionNewEvents.put(edgeId, true); + processEdgeEventMigrationIfNeeded(session, edgeId); + if (Boolean.TRUE.equals(edgeEventsMigrationProcessed.get(edgeId))) { + Futures.addCallback(session.processEdgeEvents(), new FutureCallback<>() { + @Override + public void onSuccess(Boolean newEventsAdded) { + if (Boolean.TRUE.equals(newEventsAdded)) { + sessionNewEvents.put(edgeId, true); + } + scheduleEdgeEventsCheck(session); } - scheduleEdgeEventsCheck(session); - } - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Failed to process edge events for edge [{}]!", tenantId, session.getEdge().getId().getId(), t); - scheduleEdgeEventsCheck(session); - } - }, ctx.getGrpcCallbackExecutorService()); + @Override + public void onFailure(Throwable t) { + log.warn("[{}] Failed to process edge events for edge [{}]!", tenantId, session.getEdge().getId().getId(), t); + scheduleEdgeEventsCheck(session); + } + }, ctx.getGrpcCallbackExecutorService()); + } else { + scheduleEdgeEventsCheck(session); + } } else { scheduleEdgeEventsCheck(session); } @@ -457,8 +461,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i scheduleEdgeEventsCheck(session); } else if (Boolean.FALSE.equals(eventsExist)) { edgeEventsMigrationProcessed.put(edgeId, true); - } else { - scheduleEdgeEventsCheck(session); } } }