diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index ad2a97de52..3ad6164c61 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -143,6 +143,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi } else { log.info("Received application ready event. Starting current ZK node."); } + subscribeToEvents(); if (client.getState() != CuratorFrameworkState.STARTED) { log.debug("Ignoring application ready event, ZK client is not started, ZK client state [{}]", client.getState()); return; @@ -212,6 +213,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi try { destroyZkClient(); initZkClient(); + subscribeToEvents(); publishCurrentServer(); } catch (Exception e) { log.error("Failed to reconnect to ZK: {}", e.getMessage(), e); @@ -227,7 +229,6 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi client.start(); client.blockUntilConnected(); cache = new PathChildrenCache(client, zkNodesDir, true); - cache.getListenable().addListener(this); cache.start(); stopped = false; log.info("ZK client connected"); @@ -239,6 +240,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi } } + private void subscribeToEvents() { + cache.getListenable().addListener(this); + } + private void unpublishCurrentServer() { try { if (nodePath != null) {