diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 8226610a7d..f12a20094d 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -31,6 +31,7 @@ import org.springframework.web.bind.MethodArgumentNotValidException; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.context.request.async.AsyncRequestTimeoutException; import org.springframework.web.context.request.async.DeferredResult; +import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Dashboard; @@ -833,18 +834,14 @@ public abstract class BaseController { } protected DeferredResult wrapFuture(ListenableFuture future) { - final DeferredResult deferredResult = new DeferredResult<>(); - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(T result) { - deferredResult.setResult(result); - } + DeferredResult deferredResult = new DeferredResult<>(); // Timeout of spring.mvc.async.request-timeout is used + DonAsynchron.withCallback(future, deferredResult::setResult, deferredResult::setErrorResult); + return deferredResult; + } - @Override - public void onFailure(Throwable t) { - deferredResult.setErrorResult(t); - } - }, MoreExecutors.directExecutor()); + protected DeferredResult wrapFuture(ListenableFuture future, long timeoutMs) { + DeferredResult deferredResult = new DeferredResult<>(timeoutMs); + DonAsynchron.withCallback(future, deferredResult::setResult, deferredResult::setErrorResult); return deferredResult; } diff --git a/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java b/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java index b5c8e18ed2..bfd874186a 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.MoreExecutors; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; @@ -83,6 +84,8 @@ public class EntitiesVersionControlController extends BaseController { private final EntitiesVersionControlService versionControlService; + @Value("${queue.vc.request-timeout:180000}") + private int vcRequestTimeout; @ApiOperation(value = "Save entities version (saveEntitiesVersion)", notes = "" + "Creates a new version of entities (or a single entity) by request.\n" + @@ -515,4 +518,9 @@ public class EntitiesVersionControlController extends BaseController { }, MoreExecutors.directExecutor())); } + @Override + protected DeferredResult wrapFuture(ListenableFuture future) { + return wrapFuture(future, vcRequestTimeout); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/executors/VersionControlExecutor.java b/application/src/main/java/org/thingsboard/server/service/executors/VersionControlExecutor.java new file mode 100644 index 0000000000..a3957653d9 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/executors/VersionControlExecutor.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.executors; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.AbstractListeningExecutor; + +@Component +public class VersionControlExecutor extends AbstractListeningExecutor { + + @Value("${vc.thread_pool_size:6}") + private int threadPoolSize; + + @Override + protected int getThreadPollSize() { + return threadPoolSize; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultEntitiesVersionControlService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultEntitiesVersionControlService.java index af2ac5cf0f..784e6f5fbe 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultEntitiesVersionControlService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultEntitiesVersionControlService.java @@ -17,19 +17,16 @@ package org.thingsboard.server.service.sync.vc; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.support.TransactionTemplate; import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.TbStopWatch; -import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.cache.TbTransactionalCache; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.ExportableEntity; @@ -72,6 +69,7 @@ import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.exception.DeviceCredentialsValidationException; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.entitiy.TbNotificationEntityService; +import org.thingsboard.server.service.executors.VersionControlExecutor; import org.thingsboard.server.service.sync.ie.EntitiesExportImportService; import org.thingsboard.server.service.sync.ie.exporting.ExportableEntitiesService; import org.thingsboard.server.service.sync.ie.importing.impl.MissingEntityException; @@ -85,8 +83,6 @@ import org.thingsboard.server.service.sync.vc.data.ReimportTask; import org.thingsboard.server.service.sync.vc.data.SimpleEntitiesExportCtx; import org.thingsboard.server.service.sync.vc.repository.TbRepositorySettingsService; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -115,23 +111,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont private final TbNotificationEntityService entityNotificationService; private final TransactionTemplate transactionTemplate; private final TbTransactionalCache taskCache; - - private ListeningExecutorService executor; - - @Value("${vc.thread_pool_size:4}") - private int threadPoolSize; - - @PostConstruct - public void init() { - executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, DefaultEntitiesVersionControlService.class)); - } - - @PreDestroy - public void shutdown() { - if (executor != null) { - executor.shutdownNow(); - } - } + private final VersionControlExecutor executor; @SuppressWarnings("UnstableApiUsage") @Override @@ -185,7 +165,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont throw new ThingsboardException(ThingsboardErrorCode.ITEM_NOT_FOUND); } else { var entry = cacheEntry.get(); - log.debug("[{}] Cache get: {}", requestId, entry); + log.trace("[{}] Cache get: {}", requestId, entry); var result = getter.apply(entry); if (result == null) { throw new ThingsboardException(ThingsboardErrorCode.BAD_REQUEST_PARAMS); @@ -631,7 +611,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont } private VersionLoadResult cachePut(UUID requestId, VersionLoadResult result) { - log.debug("[{}] Cache put: {}", requestId, result); + log.trace("[{}] Cache put: {}", requestId, result); taskCache.put(requestId, VersionControlTaskCacheEntry.newForImport(result)); return result; } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java index 0cc73cd945..7a1c6bec6e 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java @@ -18,7 +18,6 @@ package org.thingsboard.server.service.sync.vc; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; import lombok.SneakyThrows; @@ -62,6 +61,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.executors.VersionControlExecutor; import org.thingsboard.server.service.sync.vc.data.ClearRepositoryGitRequest; import org.thingsboard.server.service.sync.vc.data.CommitGitRequest; import org.thingsboard.server.service.sync.vc.data.EntitiesContentGitRequest; @@ -92,6 +92,7 @@ import java.util.stream.Collectors; @TbCoreComponent @Service @Slf4j +@SuppressWarnings("UnstableApiUsage") public class DefaultGitVersionControlQueueService implements GitVersionControlQueueService { private final TbServiceInfoProvider serviceInfoProvider; @@ -99,40 +100,42 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu private final DataDecodingEncodingService encodingService; private final DefaultEntitiesVersionControlService entitiesVersionControlService; private final SchedulerComponent scheduler; + private final VersionControlExecutor executor; - private final Map> pendingRequestMap = new HashMap<>(); + private final Map> pendingRequestMap = new ConcurrentHashMap<>(); private final Map> chunkedMsgs = new ConcurrentHashMap<>(); @Value("${queue.vc.request-timeout:180000}") private int requestTimeout; - @Value("${queue.vc.msg-chunk-size:500000}") + @Value("${queue.vc.msg-chunk-size:250000}") private int msgChunkSize; public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService, DataDecodingEncodingService encodingService, @Lazy DefaultEntitiesVersionControlService entitiesVersionControlService, - SchedulerComponent scheduler) { + SchedulerComponent scheduler, VersionControlExecutor executor) { this.serviceInfoProvider = serviceInfoProvider; this.clusterService = clusterService; this.encodingService = encodingService; this.entitiesVersionControlService = entitiesVersionControlService; this.scheduler = scheduler; + this.executor = executor; } @Override public ListenableFuture prepareCommit(User user, VersionCreateRequest request) { - SettableFuture future = SettableFuture.create(); - + log.debug("Executing prepareCommit [{}][{}]", request.getBranch(), request.getVersionName()); CommitGitRequest commit = new CommitGitRequest(user.getTenantId(), request); - registerAndSend(commit, builder -> builder.setCommitRequest( + ListenableFuture future = registerAndSend(commit, builder -> builder.setCommitRequest( buildCommitRequest(commit).setPrepareMsg(getCommitPrepareMsg(user, request)).build() - ).build(), wrap(future, commit)); - return future; + ).build()); + return Futures.transform(future, f -> commit, executor); } - @SuppressWarnings("UnstableApiUsage") + @SneakyThrows @Override public ListenableFuture addToCommit(CommitGitRequest commit, EntityExportData> entityData) { + log.debug("Executing addToCommit [{}][{}][{}]", entityData.getEntityType(), entityData.getEntity().getId(), commit.getRequestId()); String path = getRelativePath(entityData.getEntityType(), entityData.getExternalId()); String entityDataJson = JacksonUtil.toPrettyString(entityData.sort()); @@ -143,53 +146,42 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu AtomicInteger chunkIndex = new AtomicInteger(); List> futures = new ArrayList<>(); entityDataChunks.forEach(chunk -> { - SettableFuture chunkFuture = SettableFuture.create(); log.trace("[{}] sending chunk {} for 'addToCommit'", chunkedMsgId, chunkIndex.get()); - registerAndSend(commit, builder -> builder.setCommitRequest( - buildCommitRequest(commit).setAddMsg( - TransportProtos.AddMsg.newBuilder() - .setRelativePath(path).setEntityDataJsonChunk(chunk) - .setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement()) - .setChunksCount(chunksCount).build() + ListenableFuture chunkFuture = registerAndSend(commit, builder -> builder.setCommitRequest( + buildCommitRequest(commit).setAddMsg(TransportProtos.AddMsg.newBuilder() + .setRelativePath(path).setEntityDataJsonChunk(chunk) + .setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement()) + .setChunksCount(chunksCount) ).build() - ).build(), wrap(chunkFuture, null)); + ).build()); futures.add(chunkFuture); }); return Futures.transform(Futures.allAsList(futures), r -> { log.trace("[{}] sent all chunks for 'addToCommit'", chunkedMsgId); return null; - }, MoreExecutors.directExecutor()); + }, executor); } @Override public ListenableFuture deleteAll(CommitGitRequest commit, EntityType entityType) { - SettableFuture future = SettableFuture.create(); - + log.debug("Executing deleteAll [{}][{}][{}]", commit.getTenantId(), entityType, commit.getRequestId()); String path = getRelativePath(entityType, null); - - registerAndSend(commit, builder -> builder.setCommitRequest( + return registerAndSend(commit, builder -> builder.setCommitRequest( buildCommitRequest(commit).setDeleteMsg( - TransportProtos.DeleteMsg.newBuilder().setRelativePath(path).build() - ).build() - ).build(), wrap(future, null)); - - return future; + TransportProtos.DeleteMsg.newBuilder().setRelativePath(path) + )).build()); } @Override public ListenableFuture push(CommitGitRequest commit) { - registerAndSend(commit, builder -> builder.setCommitRequest( - buildCommitRequest(commit).setPushMsg( - TransportProtos.PushMsg.newBuilder().build() - ).build() - ).build(), wrap(commit.getFuture())); - - return commit.getFuture(); + log.debug("Executing push [{}][{}]", commit.getTenantId(), commit.getRequestId()); + return sendRequest(commit, builder -> builder.setCommitRequest( + buildCommitRequest(commit).setPushMsg(TransportProtos.PushMsg.getDefaultInstance()) + )); } @Override public ListenableFuture> listVersions(TenantId tenantId, String branch, PageLink pageLink) { - return listVersions(tenantId, applyPageLinkParameters( ListVersionsRequestMsg.newBuilder() @@ -284,90 +276,95 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu @Override @SuppressWarnings("rawtypes") public ListenableFuture getEntity(TenantId tenantId, String versionId, EntityId entityId) { + log.debug("Executing getEntity [{}][{}][{}]", tenantId, versionId, entityId); EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId); chunkedMsgs.put(request.getRequestId(), new HashMap<>()); - registerAndSend(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder() - .setVersionId(versionId) - .setEntityType(entityId.getEntityType().name()) - .setEntityIdMSB(entityId.getId().getMostSignificantBits()) - .setEntityIdLSB(entityId.getId().getLeastSignificantBits())).build() - , wrap(request.getFuture())); - return request.getFuture(); + return sendRequest(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder() + .setVersionId(versionId) + .setEntityType(entityId.getEntityType().name()) + .setEntityIdMSB(entityId.getId().getMostSignificantBits()) + .setEntityIdLSB(entityId.getId().getLeastSignificantBits())).build()); } - private void registerAndSend(PendingGitRequest request, - Function enrichFunction, TbQueueCallback callback) { - registerAndSend(request, enrichFunction, null, callback); + private ListenableFuture registerAndSend(PendingGitRequest request, + Function enrichFunction) { + return registerAndSend(request, enrichFunction, null); } - private void registerAndSend(PendingGitRequest request, - Function enrichFunction, RepositorySettings settings, TbQueueCallback callback) { + private ListenableFuture registerAndSend(PendingGitRequest request, + Function enrichFunction, + RepositorySettings settings) { if (!request.getFuture().isDone()) { pendingRequestMap.putIfAbsent(request.getRequestId(), request); var requestBody = enrichFunction.apply(newRequestProto(request, settings)); log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody); - clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback); + SettableFuture submitFuture = SettableFuture.create(); + clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + submitFuture.set(null); + } + + @Override + public void onFailure(Throwable t) { + submitFuture.setException(t); + } + }); if (request.getTimeoutTask() == null) { request.setTimeoutTask(scheduler.schedule(() -> processTimeout(request.getRequestId()), requestTimeout, TimeUnit.MILLISECONDS)); } + return submitFuture; } else { throw new RuntimeException("Future is already done!"); } } private ListenableFuture sendRequest(PendingGitRequest request, Consumer enrichFunction) { - registerAndSend(request, builder -> { + return sendRequest(request, enrichFunction, null); + } + + private ListenableFuture sendRequest(PendingGitRequest request, Consumer enrichFunction, RepositorySettings settings) { + ListenableFuture submitFuture = registerAndSend(request, builder -> { enrichFunction.accept(builder); return builder.build(); - }, wrap(request.getFuture())); - return request.getFuture(); + }, settings); + return Futures.transformAsync(submitFuture, input -> request.getFuture(), executor); } @Override @SuppressWarnings("rawtypes") public ListenableFuture> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) { + log.debug("Executing getEntities [{}][{}][{}]", tenantId, versionId, entityType); EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType); chunkedMsgs.put(request.getRequestId(), new HashMap<>()); - registerAndSend(request, builder -> builder.setEntitiesContentRequest(EntitiesContentRequestMsg.newBuilder() + return sendRequest(request, builder -> builder.setEntitiesContentRequest( + EntitiesContentRequestMsg.newBuilder() .setVersionId(versionId) .setEntityType(entityType.name()) .setOffset(offset) .setLimit(limit) - ).build() - , wrap(request.getFuture())); - - return request.getFuture(); + ).build()); } @Override public ListenableFuture initRepository(TenantId tenantId, RepositorySettings settings) { + log.debug("Executing initRepository [{}]", tenantId); VoidGitRequest request = new VoidGitRequest(tenantId); - - registerAndSend(request, builder -> builder.setInitRepositoryRequest(GenericRepositoryRequestMsg.newBuilder().build()).build() - , settings, wrap(request.getFuture())); - - return request.getFuture(); + return sendRequest(request, builder -> builder.setInitRepositoryRequest(GenericRepositoryRequestMsg.getDefaultInstance()), settings); } @Override public ListenableFuture testRepository(TenantId tenantId, RepositorySettings settings) { + log.debug("Executing testRepository [{}]", tenantId); VoidGitRequest request = new VoidGitRequest(tenantId); - - registerAndSend(request, builder -> builder - .setTestRepositoryRequest(GenericRepositoryRequestMsg.newBuilder().build()).build() - , settings, wrap(request.getFuture())); - - return request.getFuture(); + return sendRequest(request, builder -> builder.setTestRepositoryRequest(GenericRepositoryRequestMsg.getDefaultInstance()), settings); } @Override public ListenableFuture clearRepository(TenantId tenantId) { + log.debug("Executing clearRepository [{}]", tenantId); ClearRepositoryGitRequest request = new ClearRepositoryGitRequest(tenantId); - - registerAndSend(request, builder -> builder.setClearRepositoryRequest(GenericRepositoryRequestMsg.newBuilder().build()).build() - , wrap(request.getFuture())); - - return request.getFuture(); + return sendRequest(request, builder -> builder.setClearRepositoryRequest(GenericRepositoryRequestMsg.getDefaultInstance())); } @Override @@ -518,35 +515,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu return JacksonUtil.fromString(data, EntityExportData.class); } - //The future will be completed when the corresponding result arrives from kafka - private static TbQueueCallback wrap(SettableFuture future) { - return new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - } - - @Override - public void onFailure(Throwable t) { - future.setException(t); - } - }; - } - - //The future will be completed when the request is successfully sent to kafka - private TbQueueCallback wrap(SettableFuture future, T value) { - return new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - future.set(value); - } - - @Override - public void onFailure(Throwable t) { - future.setException(t); - } - }; - } - private static String getRelativePath(EntityType entityType, EntityId entityId) { String path = entityType.name().toLowerCase(); if (entityId != null) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 5b37ae7ce7..c650d53b8f 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1379,7 +1379,7 @@ queue: # Kafka properties for OTA updates topic ota-updates: "${TB_QUEUE_KAFKA_OTA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}" # Kafka properties for Version Control topic - version-control: "${TB_QUEUE_KAFKA_VC_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}" + version-control: "${TB_QUEUE_KAFKA_VC_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" @@ -1672,7 +1672,7 @@ metrics: # Version control parameters vc: # Pool size for handling export tasks - thread_pool_size: "${TB_VC_POOL_SIZE:2}" + thread_pool_size: "${TB_VC_POOL_SIZE:6}" git: # Pool size for handling the git IO operations io_pool_size: "${TB_VC_GIT_POOL_SIZE:3}" diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java index 38128c4387..fa45ddf98c 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java @@ -127,7 +127,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe private long packProcessingTimeout; @Value("${vc.git.io_pool_size:3}") private int ioPoolSize; - @Value("${queue.vc.msg-chunk-size:500000}") + @Value("${queue.vc.msg-chunk-size:250000}") private int msgChunkSize; //We need to manually manage the threads since tasks for particular tenant need to be processed sequentially. @@ -304,6 +304,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe private void handleEntityContentRequest(VersionControlRequestCtx ctx, EntityContentRequestMsg request) throws IOException { String path = getRelativePath(EntityType.valueOf(request.getEntityType()), new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString()); + log.debug("Executing handleEntityContentRequest [{}][{}]", ctx.getTenantId(), path); String data = vcService.getFileContentAtCommit(ctx.getTenantId(), path, request.getVersionId()); Iterable dataChunks = StringUtils.split(data, msgChunkSize); @@ -394,6 +395,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe } private void handleCommitRequest(VersionControlRequestCtx ctx, CommitRequestMsg request) throws Exception { + log.debug("Executing handleCommitRequest [{}][{}]", ctx.getTenantId(), ctx.getRequestId()); var tenantId = ctx.getTenantId(); UUID txId = UUID.fromString(request.getTxId()); if (request.hasPrepareMsg()) { @@ -444,6 +446,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe } private void addToCommit(VersionControlRequestCtx ctx, PendingCommit commit, AddMsg addMsg) throws IOException { + log.debug("Executing addToCommit [{}][{}]", ctx.getTenantId(), ctx.getRequestId()); log.trace("[{}] received chunk {} for 'addToCommit'", addMsg.getChunkedMsgId(), addMsg.getChunkIndex()); Map chunkedMsgs = commit.getChunkedMsgs(); String[] msgChunks = chunkedMsgs.computeIfAbsent(addMsg.getChunkedMsgId(), id -> new String[addMsg.getChunksCount()]); diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 645acfb164..962c5303c7 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -136,7 +136,7 @@ queue: # Kafka properties for Notifications topics notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" # Kafka properties for Core topics - version-control: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + version-control: "${TB_QUEUE_KAFKA_VC_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" @@ -263,7 +263,7 @@ queue: # Version control parameters vc: # Pool size for handling export tasks - thread_pool_size: "${TB_VC_POOL_SIZE:2}" + thread_pool_size: "${TB_VC_POOL_SIZE:6}" git: # Pool size for handling the git IO operations io_pool_size: "${TB_VC_GIT_POOL_SIZE:3}"