added recalculetePartitions delay for node restart
This commit is contained in:
		
							parent
							
								
									6b8cbbd3c9
								
							
						
					
					
						commit
						aa28b276d2
					
				@ -96,6 +96,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
 | 
			
		||||
 | 
			
		||||
cluster:
 | 
			
		||||
  stats:
 | 
			
		||||
 | 
			
		||||
@ -44,8 +44,10 @@ import javax.annotation.PostConstruct;
 | 
			
		||||
import javax.annotation.PreDestroy;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.NoSuchElementException;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.ScheduledExecutorService;
 | 
			
		||||
import java.util.concurrent.ScheduledFuture;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@ -66,6 +68,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
 | 
			
		||||
    private Integer zkSessionTimeout;
 | 
			
		||||
    @Value("${zk.zk_dir}")
 | 
			
		||||
    private String zkDir;
 | 
			
		||||
    @Value("${zk.recalculate_delay:120000}")
 | 
			
		||||
    private Long recalculateDelay;
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentHashMap<String, ScheduledFuture<?>> delayedTasks;
 | 
			
		||||
 | 
			
		||||
    private final TbServiceInfoProvider serviceInfoProvider;
 | 
			
		||||
    private final PartitionService partitionService;
 | 
			
		||||
@ -82,6 +88,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
 | 
			
		||||
                              PartitionService partitionService) {
 | 
			
		||||
        this.serviceInfoProvider = serviceInfoProvider;
 | 
			
		||||
        this.partitionService = partitionService;
 | 
			
		||||
        delayedTasks = new ConcurrentHashMap<>();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
@ -290,8 +297,30 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
 | 
			
		||||
        log.debug("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), instance.getServiceId());
 | 
			
		||||
        switch (pathChildrenCacheEvent.getType()) {
 | 
			
		||||
            case CHILD_ADDED:
 | 
			
		||||
                ScheduledFuture<?> task = delayedTasks.remove(instance.getServiceId());
 | 
			
		||||
                if (task != null) {
 | 
			
		||||
                    if (!task.cancel(false)) {
 | 
			
		||||
                        log.debug("[{}] Going to recalculate partitions due to adding new node [{}]",
 | 
			
		||||
                                instance.getServiceId(), instance.getServiceTypesList());
 | 
			
		||||
                        recalculatePartitions();
 | 
			
		||||
                    } else {
 | 
			
		||||
                        log.debug("[{}] Recalculate partitions ignored. Service restarted in time [{}]",
 | 
			
		||||
                                instance.getServiceId(), instance.getServiceTypesList());
 | 
			
		||||
                    }
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.debug("[{}] Going to recalculate partitions due to adding new node [{}]",
 | 
			
		||||
                            instance.getServiceId(), instance.getServiceTypesList());
 | 
			
		||||
                    recalculatePartitions();
 | 
			
		||||
                }
 | 
			
		||||
                break;
 | 
			
		||||
            case CHILD_REMOVED:
 | 
			
		||||
                recalculatePartitions();
 | 
			
		||||
                ScheduledFuture<?> future = zkExecutorService.schedule(() -> {
 | 
			
		||||
                    log.debug("[{}] Going to recalculate partitions due to removed node [{}]",
 | 
			
		||||
                            instance.getServiceId(), instance.getServiceTypesList());
 | 
			
		||||
                    delayedTasks.remove(instance.getServiceId());
 | 
			
		||||
                    recalculatePartitions();
 | 
			
		||||
                }, recalculateDelay, TimeUnit.MILLISECONDS);
 | 
			
		||||
                delayedTasks.put(instance.getServiceId(), future);
 | 
			
		||||
                break;
 | 
			
		||||
            default:
 | 
			
		||||
                break;
 | 
			
		||||
 | 
			
		||||
@ -41,6 +41,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
 | 
			
		||||
 | 
			
		||||
queue:
 | 
			
		||||
  type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
 | 
			
		||||
 | 
			
		||||
@ -41,6 +41,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
 | 
			
		||||
 | 
			
		||||
cache:
 | 
			
		||||
  type: "${CACHE_TYPE:redis}"
 | 
			
		||||
 | 
			
		||||
@ -68,6 +68,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
 | 
			
		||||
 | 
			
		||||
cache:
 | 
			
		||||
  type: "${CACHE_TYPE:redis}"
 | 
			
		||||
 | 
			
		||||
@ -41,6 +41,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
 | 
			
		||||
 | 
			
		||||
cache:
 | 
			
		||||
  type: "${CACHE_TYPE:redis}"
 | 
			
		||||
 | 
			
		||||
@ -41,6 +41,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
 | 
			
		||||
 | 
			
		||||
cache:
 | 
			
		||||
  type: "${CACHE_TYPE:redis}"
 | 
			
		||||
 | 
			
		||||
@ -41,6 +41,7 @@ zk:
 | 
			
		||||
  session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
 | 
			
		||||
  # Name of the directory in zookeeper 'filesystem'
 | 
			
		||||
  zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
 | 
			
		||||
  recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
 | 
			
		||||
 | 
			
		||||
cache:
 | 
			
		||||
  type: "${CACHE_TYPE:redis}"
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user