improvements

This commit is contained in:
YevhenBondarenko 2023-07-06 13:31:25 +02:00 committed by Andrii Shvaika
parent 68149d9673
commit ce9552e1a8

View File

@ -299,16 +299,16 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
case CHILD_ADDED: case CHILD_ADDED:
ScheduledFuture<?> task = delayedTasks.remove(instance.getServiceId()); ScheduledFuture<?> task = delayedTasks.remove(instance.getServiceId());
if (task != null) { if (task != null) {
if (!task.cancel(false)) { if (task.cancel(false)) {
log.debug("[{}] Going to recalculate partitions due to adding new node [{}]", log.debug("[{}] Recalculate partitions ignored. Service was restarted in time [{}].",
instance.getServiceId(), instance.getServiceTypesList());
} else {
log.debug("[{}] Going to recalculate partitions. Service was not restarted in time [{}]!",
instance.getServiceId(), instance.getServiceTypesList()); instance.getServiceId(), instance.getServiceTypesList());
recalculatePartitions(); recalculatePartitions();
} else {
log.debug("[{}] Recalculate partitions ignored. Service restarted in time [{}]",
instance.getServiceId(), instance.getServiceTypesList());
} }
} else { } else {
log.debug("[{}] Going to recalculate partitions due to adding new node [{}]", log.debug("[{}] Going to recalculate partitions due to adding new node [{}].",
instance.getServiceId(), instance.getServiceTypesList()); instance.getServiceId(), instance.getServiceTypesList());
recalculatePartitions(); recalculatePartitions();
} }
@ -317,8 +317,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
ScheduledFuture<?> future = zkExecutorService.schedule(() -> { ScheduledFuture<?> future = zkExecutorService.schedule(() -> {
log.debug("[{}] Going to recalculate partitions due to removed node [{}]", log.debug("[{}] Going to recalculate partitions due to removed node [{}]",
instance.getServiceId(), instance.getServiceTypesList()); instance.getServiceId(), instance.getServiceTypesList());
delayedTasks.remove(instance.getServiceId()); ScheduledFuture<?> removedTask = delayedTasks.remove(instance.getServiceId());
recalculatePartitions(); if (removedTask != null) {
recalculatePartitions();
}
}, recalculateDelay, TimeUnit.MILLISECONDS); }, recalculateDelay, TimeUnit.MILLISECONDS);
delayedTasks.put(instance.getServiceId(), future); delayedTasks.put(instance.getServiceId(), future);
break; break;
@ -332,6 +334,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
* Synchronized to ensure that other servers info is up to date * Synchronized to ensure that other servers info is up to date
* */ * */
synchronized void recalculatePartitions() { synchronized void recalculatePartitions() {
delayedTasks.clear();
partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), getOtherServers()); partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), getOtherServers());
} }