Merge pull request #12134 from thingsboard/improvements/resources-upgrade

Improvements for resources upgrade
This commit is contained in:
Viacheslav Klimov 2024-11-27 17:52:20 +02:00 committed by GitHub
commit f58576b8b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 68 additions and 38 deletions

View File

@ -16,10 +16,12 @@
package org.thingsboard.server.service.install.update;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.HasImage;
import org.thingsboard.server.common.data.id.DashboardId;
@ -42,6 +44,8 @@ import org.thingsboard.server.dao.widget.WidgetTypeDao;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleDao;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
@ -86,44 +90,60 @@ public class ResourcesUpdater {
log.debug("Created/updated system images and resources for default dashboard '{}'", defaultDashboard.getTitle());
}
@SneakyThrows
public void updateDashboardsResources() {
log.info("Updating resources usage in dashboards");
var dashboards = new PageDataIterable<>(dashboardService::findAllDashboardsIds, 512);
int totalCount = 0;
int updatedCount = 0;
for (DashboardId dashboardId : dashboards) {
Dashboard dashboard = dashboardService.findDashboardById(TenantId.SYS_TENANT_ID, dashboardId);
boolean updated = resourceService.updateResourcesUsage(dashboard); // will convert resources ids to new structure
if (updated) {
dashboardService.saveDashboard(dashboard);
updatedCount++;
}
totalCount++;
var executor = ThingsBoardExecutors.newLimitedTasksExecutor(4, 4, "dashboards-resources-upgrade");
if (totalCount % 1000 == 0) {
log.info("Processed {} dashboards, updated {}", totalCount, updatedCount);
}
var dashboards = new PageDataIterable<>(dashboardService::findAllDashboardsIds, 512);
AtomicInteger totalCount = new AtomicInteger();
AtomicInteger updatedCount = new AtomicInteger();
for (DashboardId dashboardId : dashboards) {
executor.submit(() -> {
Dashboard dashboard = dashboardService.findDashboardById(TenantId.SYS_TENANT_ID, dashboardId);
boolean updated = resourceService.updateResourcesUsage(dashboard); // will convert resources ids to new structure
if (updated) {
dashboardService.saveDashboard(dashboard);
updatedCount.incrementAndGet();
}
if (totalCount.incrementAndGet() % 1000 == 0) {
log.info("Processed {} dashboards, updated {}", totalCount, updatedCount);
}
});
}
executor.shutdown();
if (!executor.awaitTermination(10, TimeUnit.MINUTES)) {
throw new RuntimeException("Dashboards resources update timeout"); // just in case, should happen
}
log.info("Updated {} dashboards", updatedCount);
}
@SneakyThrows
public void updateWidgetsResources() {
log.info("Updating resources usage in widgets");
int totalCount = 0;
int updatedCount = 0;
var executor = ThingsBoardExecutors.newLimitedTasksExecutor(4, 4, "widgets-resources-upgrade");
AtomicInteger totalCount = new AtomicInteger();
AtomicInteger updatedCount = new AtomicInteger();
var widgets = new PageDataIterable<>(widgetTypeService::findAllWidgetTypesIds, 512);
for (WidgetTypeId widgetTypeId : widgets) {
WidgetTypeDetails widgetTypeDetails = widgetTypeService.findWidgetTypeDetailsById(TenantId.SYS_TENANT_ID, widgetTypeId);
boolean updated = resourceService.updateResourcesUsage(widgetTypeDetails);
if (updated) {
widgetTypeService.saveWidgetType(widgetTypeDetails);
updatedCount++;
}
totalCount++;
executor.submit(() -> {
WidgetTypeDetails widgetTypeDetails = widgetTypeService.findWidgetTypeDetailsById(TenantId.SYS_TENANT_ID, widgetTypeId);
boolean updated = resourceService.updateResourcesUsage(widgetTypeDetails);
if (updated) {
widgetTypeService.saveWidgetType(widgetTypeDetails);
updatedCount.incrementAndGet();
}
if (totalCount.incrementAndGet() % 200 == 0) {
log.info("Processed {} widgets, updated {}", totalCount, updatedCount);
}
});
}
if (totalCount % 200 == 0) {
log.info("Processed {} widgets, updated {}", totalCount, updatedCount);
}
executor.shutdown();
if (!executor.awaitTermination(10, TimeUnit.MINUTES)) {
throw new RuntimeException("Widgets resources update timeout");
}
log.info("Updated {} widgets", updatedCount);
}

View File

@ -31,7 +31,7 @@ import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
@ -71,8 +71,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -90,16 +89,11 @@ public abstract class AbstractBulkImportService<E extends HasId<? extends Entity
@Autowired
private EntityActionService entityActionService;
private ThreadPoolExecutor executor;
private ExecutorService executor;
@PostConstruct
private void initExecutor() {
if (executor == null) {
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(150_000),
ThingsBoardThreadFactory.forName("bulk-import"), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
}
executor = ThingsBoardExecutors.newLimitedTasksExecutor(Runtime.getRuntime().availableProcessors(), 150_000, "bulk-import");
}
public final BulkImportResult<E> processBulkImport(BulkImportRequest request, SecurityUser user) throws Exception {
@ -287,8 +281,8 @@ public abstract class AbstractBulkImportService<E extends HasId<? extends Entity
@PreDestroy
private void shutdownExecutor() {
if (!executor.isTerminating()) {
executor.shutdown();
if (executor != null) {
executor.shutdownNow();
}
}
@ -297,6 +291,7 @@ public abstract class AbstractBulkImportService<E extends HasId<? extends Entity
private final Map<BulkImportColumnType, String> fields = new LinkedHashMap<>();
private final Map<BulkImportRequest.ColumnMapping, ParsedValue> kvs = new LinkedHashMap<>();
private int lineNumber;
}
@Data

View File

@ -17,6 +17,9 @@ package org.thingsboard.common.util;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThingsBoardExecutors {
@ -48,4 +51,16 @@ public class ThingsBoardExecutors {
return newWorkStealingPool(parallelism, clazz.getSimpleName());
}
/*
* executor with limited tasks queue size
* */
public static ExecutorService newLimitedTasksExecutor(int threads, int maxQueueSize, String name) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(maxQueueSize),
ThingsBoardThreadFactory.forName(name),
new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
return executor;
}
}