From eedbaeddaf74c90b7a1581349615dd15aa17bdaa Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 27 Nov 2024 17:41:37 +0200 Subject: [PATCH] Improvements for resources upgrade --- .../install/update/ResourcesUpdater.java | 72 ++++++++++++------- .../csv/AbstractBulkImportService.java | 19 ++--- .../common/util/ThingsBoardExecutors.java | 15 ++++ 3 files changed, 68 insertions(+), 38 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/ResourcesUpdater.java b/application/src/main/java/org/thingsboard/server/service/install/update/ResourcesUpdater.java index eb3760cac6..00797da449 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/ResourcesUpdater.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/ResourcesUpdater.java @@ -16,10 +16,12 @@ package org.thingsboard.server.service.install.update; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.HasImage; import org.thingsboard.server.common.data.id.DashboardId; @@ -42,6 +44,8 @@ import org.thingsboard.server.dao.widget.WidgetTypeDao; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleDao; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; @@ -86,44 +90,60 @@ public class ResourcesUpdater { log.debug("Created/updated system images and resources for default dashboard '{}'", defaultDashboard.getTitle()); } + @SneakyThrows public void updateDashboardsResources() { log.info("Updating resources usage in dashboards"); - var dashboards = new PageDataIterable<>(dashboardService::findAllDashboardsIds, 512); - int totalCount = 0; - int updatedCount = 0; - for (DashboardId dashboardId : dashboards) { - Dashboard dashboard = dashboardService.findDashboardById(TenantId.SYS_TENANT_ID, dashboardId); - boolean updated = resourceService.updateResourcesUsage(dashboard); // will convert resources ids to new structure - if (updated) { - dashboardService.saveDashboard(dashboard); - updatedCount++; - } - totalCount++; + var executor = ThingsBoardExecutors.newLimitedTasksExecutor(4, 4, "dashboards-resources-upgrade"); - if (totalCount % 1000 == 0) { - log.info("Processed {} dashboards, updated {}", totalCount, updatedCount); - } + var dashboards = new PageDataIterable<>(dashboardService::findAllDashboardsIds, 512); + AtomicInteger totalCount = new AtomicInteger(); + AtomicInteger updatedCount = new AtomicInteger(); + for (DashboardId dashboardId : dashboards) { + executor.submit(() -> { + Dashboard dashboard = dashboardService.findDashboardById(TenantId.SYS_TENANT_ID, dashboardId); + boolean updated = resourceService.updateResourcesUsage(dashboard); // will convert resources ids to new structure + if (updated) { + dashboardService.saveDashboard(dashboard); + updatedCount.incrementAndGet(); + } + if (totalCount.incrementAndGet() % 1000 == 0) { + log.info("Processed {} dashboards, updated {}", totalCount, updatedCount); + } + }); + } + + executor.shutdown(); + if (!executor.awaitTermination(10, TimeUnit.MINUTES)) { + throw new RuntimeException("Dashboards resources update timeout"); // just in case, should happen } log.info("Updated {} dashboards", updatedCount); } + @SneakyThrows public void updateWidgetsResources() { log.info("Updating resources usage in widgets"); - int totalCount = 0; - int updatedCount = 0; + var executor = ThingsBoardExecutors.newLimitedTasksExecutor(4, 4, "widgets-resources-upgrade"); + + AtomicInteger totalCount = new AtomicInteger(); + AtomicInteger updatedCount = new AtomicInteger(); var widgets = new PageDataIterable<>(widgetTypeService::findAllWidgetTypesIds, 512); for (WidgetTypeId widgetTypeId : widgets) { - WidgetTypeDetails widgetTypeDetails = widgetTypeService.findWidgetTypeDetailsById(TenantId.SYS_TENANT_ID, widgetTypeId); - boolean updated = resourceService.updateResourcesUsage(widgetTypeDetails); - if (updated) { - widgetTypeService.saveWidgetType(widgetTypeDetails); - updatedCount++; - } - totalCount++; + executor.submit(() -> { + WidgetTypeDetails widgetTypeDetails = widgetTypeService.findWidgetTypeDetailsById(TenantId.SYS_TENANT_ID, widgetTypeId); + boolean updated = resourceService.updateResourcesUsage(widgetTypeDetails); + if (updated) { + widgetTypeService.saveWidgetType(widgetTypeDetails); + updatedCount.incrementAndGet(); + } + if (totalCount.incrementAndGet() % 200 == 0) { + log.info("Processed {} widgets, updated {}", totalCount, updatedCount); + } + }); + } - if (totalCount % 200 == 0) { - log.info("Processed {} widgets, updated {}", totalCount, updatedCount); - } + executor.shutdown(); + if (!executor.awaitTermination(10, TimeUnit.MINUTES)) { + throw new RuntimeException("Widgets resources update timeout"); } log.info("Updated {} widgets", updatedCount); } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java index ff764b7db3..50d8b9c371 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java @@ -31,7 +31,7 @@ import org.springframework.security.core.context.SecurityContext; import org.springframework.security.core.context.SecurityContextHolder; import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EntityType; @@ -71,8 +71,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -90,16 +89,11 @@ public abstract class AbstractBulkImportService(150_000), - ThingsBoardThreadFactory.forName("bulk-import"), new ThreadPoolExecutor.CallerRunsPolicy()); - executor.allowCoreThreadTimeOut(true); - } + executor = ThingsBoardExecutors.newLimitedTasksExecutor(Runtime.getRuntime().availableProcessors(), 150_000, "bulk-import"); } public final BulkImportResult processBulkImport(BulkImportRequest request, SecurityUser user) throws Exception { @@ -287,8 +281,8 @@ public abstract class AbstractBulkImportService fields = new LinkedHashMap<>(); private final Map kvs = new LinkedHashMap<>(); private int lineNumber; + } @Data diff --git a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java index fde2362091..913f32c7f9 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java +++ b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java @@ -17,6 +17,9 @@ package org.thingsboard.common.util; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class ThingsBoardExecutors { @@ -48,4 +51,16 @@ public class ThingsBoardExecutors { return newWorkStealingPool(parallelism, clazz.getSimpleName()); } + /* + * executor with limited tasks queue size + * */ + public static ExecutorService newLimitedTasksExecutor(int threads, int maxQueueSize, String name) { + ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads, + 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(maxQueueSize), + ThingsBoardThreadFactory.forName(name), + new ThreadPoolExecutor.CallerRunsPolicy()); + executor.allowCoreThreadTimeOut(true); + return executor; + } + }