added recalculetePartitions delay for node restart

This commit is contained in:
YevhenBondarenko 2023-06-16 15:40:29 +02:00 committed by Andrii Shvaika
parent 1248b66004
commit 68149d9673
8 changed files with 37 additions and 1 deletions

View File

@ -96,6 +96,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
cluster: cluster:
stats: stats:

View File

@ -44,8 +44,10 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -66,6 +68,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
private Integer zkSessionTimeout; private Integer zkSessionTimeout;
@Value("${zk.zk_dir}") @Value("${zk.zk_dir}")
private String zkDir; private String zkDir;
@Value("${zk.recalculate_delay:120000}")
private Long recalculateDelay;
private final ConcurrentHashMap<String, ScheduledFuture<?>> delayedTasks;
private final TbServiceInfoProvider serviceInfoProvider; private final TbServiceInfoProvider serviceInfoProvider;
private final PartitionService partitionService; private final PartitionService partitionService;
@ -82,6 +88,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
PartitionService partitionService) { PartitionService partitionService) {
this.serviceInfoProvider = serviceInfoProvider; this.serviceInfoProvider = serviceInfoProvider;
this.partitionService = partitionService; this.partitionService = partitionService;
delayedTasks = new ConcurrentHashMap<>();
} }
@PostConstruct @PostConstruct
@ -290,8 +297,30 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
log.debug("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), instance.getServiceId()); log.debug("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), instance.getServiceId());
switch (pathChildrenCacheEvent.getType()) { switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED: case CHILD_ADDED:
ScheduledFuture<?> task = delayedTasks.remove(instance.getServiceId());
if (task != null) {
if (!task.cancel(false)) {
log.debug("[{}] Going to recalculate partitions due to adding new node [{}]",
instance.getServiceId(), instance.getServiceTypesList());
recalculatePartitions();
} else {
log.debug("[{}] Recalculate partitions ignored. Service restarted in time [{}]",
instance.getServiceId(), instance.getServiceTypesList());
}
} else {
log.debug("[{}] Going to recalculate partitions due to adding new node [{}]",
instance.getServiceId(), instance.getServiceTypesList());
recalculatePartitions();
}
break;
case CHILD_REMOVED: case CHILD_REMOVED:
recalculatePartitions(); ScheduledFuture<?> future = zkExecutorService.schedule(() -> {
log.debug("[{}] Going to recalculate partitions due to removed node [{}]",
instance.getServiceId(), instance.getServiceTypesList());
delayedTasks.remove(instance.getServiceId());
recalculatePartitions();
}, recalculateDelay, TimeUnit.MILLISECONDS);
delayedTasks.put(instance.getServiceId(), future);
break; break;
default: default:
break; break;

View File

@ -41,6 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
queue: queue:
type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)

View File

@ -41,6 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
cache: cache:
type: "${CACHE_TYPE:redis}" type: "${CACHE_TYPE:redis}"

View File

@ -68,6 +68,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
cache: cache:
type: "${CACHE_TYPE:redis}" type: "${CACHE_TYPE:redis}"

View File

@ -41,6 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
cache: cache:
type: "${CACHE_TYPE:redis}" type: "${CACHE_TYPE:redis}"

View File

@ -41,6 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
cache: cache:
type: "${CACHE_TYPE:redis}" type: "${CACHE_TYPE:redis}"

View File

@ -41,6 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}"
cache: cache:
type: "${CACHE_TYPE:redis}" type: "${CACHE_TYPE:redis}"