From aa0179303cbe94252777d98b8b7a1579ad49aeda Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 27 Feb 2025 12:45:22 +0200 Subject: [PATCH] Subscribe to Zookeeper events only after application starts Sometimes ZK events came before the app start, causing repartition change events, while event listeners are not yet initialized by Spring --- .../server/queue/discovery/ZkDiscoveryService.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index ad2a97de52..3ad6164c61 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -143,6 +143,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi } else { log.info("Received application ready event. Starting current ZK node."); } + subscribeToEvents(); if (client.getState() != CuratorFrameworkState.STARTED) { log.debug("Ignoring application ready event, ZK client is not started, ZK client state [{}]", client.getState()); return; @@ -212,6 +213,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi try { destroyZkClient(); initZkClient(); + subscribeToEvents(); publishCurrentServer(); } catch (Exception e) { log.error("Failed to reconnect to ZK: {}", e.getMessage(), e); @@ -227,7 +229,6 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi client.start(); client.blockUntilConnected(); cache = new PathChildrenCache(client, zkNodesDir, true); - cache.getListenable().addListener(this); cache.start(); stopped = false; log.info("ZK client connected"); @@ -239,6 +240,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi } } + private void subscribeToEvents() { + cache.getListenable().addListener(this); + } + private void unpublishCurrentServer() { try { if (nodePath != null) {