From f6b00b35db3b14d40c99f55adcb0eebc8d4289d6 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Thu, 1 Nov 2018 20:52:48 +0200 Subject: [PATCH] Improve docker compose. Improve TB shutdown logic. --- .../cluster/discovery/ZkDiscoveryService.java | 22 +++++++++++++++++++ .../server/kafka/TbKafkaResponseTemplate.java | 5 +++++ docker/docker-compose.yml | 2 ++ 3 files changed, 29 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java index 997058ce41..f5321e5806 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java @@ -21,6 +21,7 @@ import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -98,6 +99,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi private PathChildrenCache cache; private String nodePath; + private volatile boolean stopped = false; + @PostConstruct public void init() { log.info("Initializing..."); @@ -118,6 +121,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi cache.start(); } catch (Exception e) { log.error("Failed to connect to ZK: {}", e.getMessage(), e); + CloseableUtils.closeQuietly(cache); CloseableUtils.closeQuietly(client); throw new RuntimeException(e); } @@ -125,7 +129,9 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @PreDestroy public void destroy() { + stopped = true; unpublishCurrentServer(); + CloseableUtils.closeQuietly(cache); CloseableUtils.closeQuietly(client); log.info("Stopped discovery service"); } @@ -228,6 +234,14 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @Override public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { + if (stopped) { + log.debug("Ignoring application ready event. Service is stopped."); + return; + } + if (client.getState() != CuratorFrameworkState.STARTED) { + log.debug("Ignoring application ready event, ZK client is not started, ZK client state [{}]", client.getState()); + return; + } publishCurrentServer(); getOtherServers().forEach( server -> log.info("Found active server: [{}:{}]", server.getHost(), server.getPort()) @@ -236,6 +250,14 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { + if (stopped) { + log.debug("Ignoring {}. Service is stopped.", pathChildrenCacheEvent); + return; + } + if (client.getState() != CuratorFrameworkState.STARTED) { + log.debug("Ignoring {}, ZK client is not started, ZK client state [{}]", pathChildrenCacheEvent, client.getState()); + return; + } ChildData data = pathChildrenCacheEvent.getData(); if (data == null) { log.debug("Ignoring {} due to empty child data", pathChildrenCacheEvent); diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java index 4c23ac2211..e10cd3c917 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java @@ -18,6 +18,7 @@ package org.thingsboard.server.kafka; import lombok.Builder; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; @@ -127,6 +128,10 @@ public class TbKafkaResponseTemplate extends AbstractTbKafkaT log.warn("[{}] Failed to process the request: {}", requestId, request, e); } }); + } catch (InterruptException ie) { + if (!stopped) { + log.warn("Fetching data from kafka was interrupted.", ie); + } } catch (Throwable e) { log.warn("Failed to obtain messages from queue.", e); try { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 0a09d8a024..b9d8fd1a54 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -64,6 +64,7 @@ services: depends_on: - kafka - redis + - tb-js-executor tb2: restart: always image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}" @@ -84,6 +85,7 @@ services: depends_on: - kafka - redis + - tb-js-executor tb-mqtt-transport1: restart: always image: "${DOCKER_REPO}/${MQTT_TRANSPORT_DOCKER_NAME}:${TB_VERSION}"