Perfomance improvement for tenant state load

This commit is contained in:
Andrii Shvaika 2021-03-19 13:51:42 +02:00
parent fbb3d85e08
commit a88d624ffa

View File

@ -72,6 +72,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -421,20 +422,33 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
private void initStatesFromDataBase() { private void initStatesFromDataBase() {
try { try {
log.info("Initializing tenant states."); log.info("Initializing tenant states.");
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); updateLock.lock();
for (Tenant tenant : tenantIterator) { try {
if (!myTenantStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) { ExecutorService tmpInitExecutor = Executors.newWorkStealingPool(20);
log.debug("[{}] Initializing tenant state.", tenant.getId()); try {
updateLock.lock(); PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024);
try { List<Future<?>> futures = new ArrayList<>();
updateTenantState(getOrFetchState(tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId())); for (Tenant tenant : tenantIterator) {
log.debug("[{}] Initialized tenant state.", tenant.getId()); if (!myTenantStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) {
} catch (Exception e) { log.debug("[{}] Initializing tenant state.", tenant.getId());
log.warn("[{}] Failed to initialize tenant API state", tenant.getId(), e); futures.add(tmpInitExecutor.submit(() -> {
} finally { try {
updateLock.unlock(); updateTenantState(getOrFetchState(tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId()));
log.debug("[{}] Initialized tenant state.", tenant.getId());
} catch (Exception e) {
log.warn("[{}] Failed to initialize tenant API state", tenant.getId(), e);
}
}));
}
} }
for (Future<?> future : futures) {
future.get();
}
} finally {
tmpInitExecutor.shutdownNow();
} }
} finally {
updateLock.unlock();
} }
log.info("Initialized tenant states."); log.info("Initialized tenant states.");
} catch (Exception e) { } catch (Exception e) {