diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index e7fbbd2a3d..9cec475335 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -96,6 +96,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" cluster: stats: diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index fcf80bcf3d..17d046a4cb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -44,8 +44,10 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -66,6 +68,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi private Integer zkSessionTimeout; @Value("${zk.zk_dir}") private String zkDir; + @Value("${zk.recalculate_delay:120000}") + private Long recalculateDelay; + + private final ConcurrentHashMap> delayedTasks; private final TbServiceInfoProvider serviceInfoProvider; private final PartitionService partitionService; @@ -82,6 +88,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi PartitionService partitionService) { this.serviceInfoProvider = serviceInfoProvider; this.partitionService = partitionService; + delayedTasks = new ConcurrentHashMap<>(); } @PostConstruct @@ -290,8 +297,30 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi log.debug("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), instance.getServiceId()); switch (pathChildrenCacheEvent.getType()) { case CHILD_ADDED: + ScheduledFuture task = delayedTasks.remove(instance.getServiceId()); + if (task != null) { + if (!task.cancel(false)) { + log.debug("[{}] Going to recalculate partitions due to adding new node [{}]", + instance.getServiceId(), instance.getServiceTypesList()); + recalculatePartitions(); + } else { + log.debug("[{}] Recalculate partitions ignored. Service restarted in time [{}]", + instance.getServiceId(), instance.getServiceTypesList()); + } + } else { + log.debug("[{}] Going to recalculate partitions due to adding new node [{}]", + instance.getServiceId(), instance.getServiceTypesList()); + recalculatePartitions(); + } + break; case CHILD_REMOVED: - recalculatePartitions(); + ScheduledFuture future = zkExecutorService.schedule(() -> { + log.debug("[{}] Going to recalculate partitions due to removed node [{}]", + instance.getServiceId(), instance.getServiceTypesList()); + delayedTasks.remove(instance.getServiceId()); + recalculatePartitions(); + }, recalculateDelay, TimeUnit.MILLISECONDS); + delayedTasks.put(instance.getServiceId(), future); break; default: break; diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 094e0e2099..2c90082eb5 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" queue: type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index b9db930657..aef46a1234 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index bff7adb561..4bce6e28d7 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -68,6 +68,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index ae8f0138a7..eab5b107c8 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 076dde0234..f0968aa6b9 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index c68c9c56a8..c7dcd70574 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -41,6 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" cache: type: "${CACHE_TYPE:redis}"