From 7d14d46edf693a25a933cc57fe1829bbce090730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yusuf=20M=C3=BCcahit=20=C3=87etinkaya?= Date: Fri, 26 Oct 2018 19:31:35 +0300 Subject: [PATCH] check and recover currentServer zNode on zk. --- .../cluster/discovery/ZkDiscoveryService.java | 48 +++++++++++++++---- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java index 6f51ceed26..c1876d6b1b 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java @@ -29,6 +29,7 @@ import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryForever; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -49,6 +50,8 @@ import java.util.NoSuchElementException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED; + /** * @author Andrew Shvayka */ @@ -121,19 +124,42 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi } @Override - public void publishCurrentServer() { + public synchronized void publishCurrentServer() { + ServerInstance self = this.serverInstance.getSelf(); + if (currentServerExists()) { + log.info("[{}:{}] ZK node for current instance already exists, NOT created new one: {}", self.getHost(), self.getPort(), nodePath); + } else { + try { + log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort()); + nodePath = client.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress())); + log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath); + client.getConnectionStateListenable().addListener(checkReconnect(self)); + } catch (Exception e) { + log.error("Failed to create ZK node", e); + throw new RuntimeException(e); + } + } + } + + private boolean currentServerExists() { + if (nodePath == null) { + return false; + } try { ServerInstance self = this.serverInstance.getSelf(); - log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort()); - nodePath = client.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress())); - log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath); - client.getConnectionStateListenable().addListener(checkReconnect(self)); + ServerAddress registeredServerAdress = null; + registeredServerAdress = SerializationUtils.deserialize(client.getData().forPath(nodePath)); + if (self.getServerAddress() != null && self.getServerAddress().equals(registeredServerAdress)) { + return true; + } + } catch (KeeperException.NoNodeException e) { + log.info("ZK node does not exist: {}", nodePath); } catch (Exception e) { - log.error("Failed to create ZK node", e); - throw new RuntimeException(e); + log.error("Couldn't check if ZK node exists", e); } + return false; } private ConnectionStateListener checkReconnect(ServerInstance self) { @@ -221,6 +247,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent); return; } else if (nodePath != null && nodePath.equals(data.getPath())) { + if (pathChildrenCacheEvent.getType() == CHILD_REMOVED) { + log.info("ZK node for current instance is somehow deleted."); + publishCurrentServer(); + } log.debug("Ignoring event about current server {}", pathChildrenCacheEvent); return; }