Merge pull request #9900 from thingsboard/fix/vc-timeout
Version control performance improvements
This commit is contained in:
		
						commit
						5cc0dc6d94
					
				@ -31,6 +31,7 @@ import org.springframework.web.bind.MethodArgumentNotValidException;
 | 
			
		||||
import org.springframework.web.bind.annotation.ExceptionHandler;
 | 
			
		||||
import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
 | 
			
		||||
import org.springframework.web.context.request.async.DeferredResult;
 | 
			
		||||
import org.thingsboard.common.util.DonAsynchron;
 | 
			
		||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
import org.thingsboard.server.common.data.Customer;
 | 
			
		||||
import org.thingsboard.server.common.data.Dashboard;
 | 
			
		||||
@ -833,18 +834,14 @@ public abstract class BaseController {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected <T> DeferredResult<T> wrapFuture(ListenableFuture<T> future) {
 | 
			
		||||
        final DeferredResult<T> deferredResult = new DeferredResult<>();
 | 
			
		||||
        Futures.addCallback(future, new FutureCallback<>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(T result) {
 | 
			
		||||
                deferredResult.setResult(result);
 | 
			
		||||
            }
 | 
			
		||||
        DeferredResult<T> deferredResult = new DeferredResult<>(); // Timeout of spring.mvc.async.request-timeout is used
 | 
			
		||||
        DonAsynchron.withCallback(future, deferredResult::setResult, deferredResult::setErrorResult);
 | 
			
		||||
        return deferredResult;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(Throwable t) {
 | 
			
		||||
                deferredResult.setErrorResult(t);
 | 
			
		||||
            }
 | 
			
		||||
        }, MoreExecutors.directExecutor());
 | 
			
		||||
    protected <T> DeferredResult<T> wrapFuture(ListenableFuture<T> future, long timeoutMs) {
 | 
			
		||||
        DeferredResult<T> deferredResult = new DeferredResult<>(timeoutMs);
 | 
			
		||||
        DonAsynchron.withCallback(future, deferredResult::setResult, deferredResult::setErrorResult);
 | 
			
		||||
        return deferredResult;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import io.swagger.annotations.ApiOperation;
 | 
			
		||||
import io.swagger.annotations.ApiParam;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.security.access.prepost.PreAuthorize;
 | 
			
		||||
import org.springframework.web.bind.annotation.GetMapping;
 | 
			
		||||
import org.springframework.web.bind.annotation.PathVariable;
 | 
			
		||||
@ -83,6 +84,8 @@ public class EntitiesVersionControlController extends BaseController {
 | 
			
		||||
 | 
			
		||||
    private final EntitiesVersionControlService versionControlService;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.vc.request-timeout:180000}")
 | 
			
		||||
    private int vcRequestTimeout;
 | 
			
		||||
 | 
			
		||||
    @ApiOperation(value = "Save entities version (saveEntitiesVersion)", notes = "" +
 | 
			
		||||
            "Creates a new version of entities (or a single entity) by request.\n" +
 | 
			
		||||
@ -515,4 +518,9 @@ public class EntitiesVersionControlController extends BaseController {
 | 
			
		||||
        }, MoreExecutors.directExecutor()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected <T> DeferredResult<T> wrapFuture(ListenableFuture<T> future) {
 | 
			
		||||
        return wrapFuture(future, vcRequestTimeout);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,32 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.executors;
 | 
			
		||||
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.common.util.AbstractListeningExecutor;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
public class VersionControlExecutor extends AbstractListeningExecutor {
 | 
			
		||||
 | 
			
		||||
    @Value("${vc.thread_pool_size:6}")
 | 
			
		||||
    private int threadPoolSize;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected int getThreadPollSize() {
 | 
			
		||||
        return threadPoolSize;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -17,19 +17,16 @@ package org.thingsboard.server.service.sync.vc;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListeningExecutorService;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.SneakyThrows;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.lang3.StringUtils;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.springframework.transaction.support.TransactionTemplate;
 | 
			
		||||
import org.thingsboard.common.util.DonAsynchron;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.common.util.TbStopWatch;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardExecutors;
 | 
			
		||||
import org.thingsboard.server.cache.TbTransactionalCache;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.ExportableEntity;
 | 
			
		||||
@ -72,6 +69,7 @@ import org.thingsboard.server.dao.DaoUtil;
 | 
			
		||||
import org.thingsboard.server.dao.exception.DeviceCredentialsValidationException;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.server.service.entitiy.TbNotificationEntityService;
 | 
			
		||||
import org.thingsboard.server.service.executors.VersionControlExecutor;
 | 
			
		||||
import org.thingsboard.server.service.sync.ie.EntitiesExportImportService;
 | 
			
		||||
import org.thingsboard.server.service.sync.ie.exporting.ExportableEntitiesService;
 | 
			
		||||
import org.thingsboard.server.service.sync.ie.importing.impl.MissingEntityException;
 | 
			
		||||
@ -85,8 +83,6 @@ import org.thingsboard.server.service.sync.vc.data.ReimportTask;
 | 
			
		||||
import org.thingsboard.server.service.sync.vc.data.SimpleEntitiesExportCtx;
 | 
			
		||||
import org.thingsboard.server.service.sync.vc.repository.TbRepositorySettingsService;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import javax.annotation.PreDestroy;
 | 
			
		||||
import java.time.Instant;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
@ -115,23 +111,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
 | 
			
		||||
    private final TbNotificationEntityService entityNotificationService;
 | 
			
		||||
    private final TransactionTemplate transactionTemplate;
 | 
			
		||||
    private final TbTransactionalCache<UUID, VersionControlTaskCacheEntry> taskCache;
 | 
			
		||||
 | 
			
		||||
    private ListeningExecutorService executor;
 | 
			
		||||
 | 
			
		||||
    @Value("${vc.thread_pool_size:4}")
 | 
			
		||||
    private int threadPoolSize;
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    public void init() {
 | 
			
		||||
        executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, DefaultEntitiesVersionControlService.class));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
    public void shutdown() {
 | 
			
		||||
        if (executor != null) {
 | 
			
		||||
            executor.shutdownNow();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    private final VersionControlExecutor executor;
 | 
			
		||||
 | 
			
		||||
    @SuppressWarnings("UnstableApiUsage")
 | 
			
		||||
    @Override
 | 
			
		||||
@ -185,7 +165,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
 | 
			
		||||
            throw new ThingsboardException(ThingsboardErrorCode.ITEM_NOT_FOUND);
 | 
			
		||||
        } else {
 | 
			
		||||
            var entry = cacheEntry.get();
 | 
			
		||||
            log.debug("[{}] Cache get: {}", requestId, entry);
 | 
			
		||||
            log.trace("[{}] Cache get: {}", requestId, entry);
 | 
			
		||||
            var result = getter.apply(entry);
 | 
			
		||||
            if (result == null) {
 | 
			
		||||
                throw new ThingsboardException(ThingsboardErrorCode.BAD_REQUEST_PARAMS);
 | 
			
		||||
@ -631,7 +611,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private VersionLoadResult cachePut(UUID requestId, VersionLoadResult result) {
 | 
			
		||||
        log.debug("[{}] Cache put: {}", requestId, result);
 | 
			
		||||
        log.trace("[{}] Cache put: {}", requestId, result);
 | 
			
		||||
        taskCache.put(requestId, VersionControlTaskCacheEntry.newForImport(result));
 | 
			
		||||
        return result;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -18,7 +18,6 @@ package org.thingsboard.server.service.sync.vc;
 | 
			
		||||
import com.google.common.collect.Iterables;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import com.google.common.util.concurrent.SettableFuture;
 | 
			
		||||
import com.google.protobuf.ByteString;
 | 
			
		||||
import lombok.SneakyThrows;
 | 
			
		||||
@ -62,6 +61,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
 | 
			
		||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.server.service.executors.VersionControlExecutor;
 | 
			
		||||
import org.thingsboard.server.service.sync.vc.data.ClearRepositoryGitRequest;
 | 
			
		||||
import org.thingsboard.server.service.sync.vc.data.CommitGitRequest;
 | 
			
		||||
import org.thingsboard.server.service.sync.vc.data.EntitiesContentGitRequest;
 | 
			
		||||
@ -92,6 +92,7 @@ import java.util.stream.Collectors;
 | 
			
		||||
@TbCoreComponent
 | 
			
		||||
@Service
 | 
			
		||||
@Slf4j
 | 
			
		||||
@SuppressWarnings("UnstableApiUsage")
 | 
			
		||||
public class DefaultGitVersionControlQueueService implements GitVersionControlQueueService {
 | 
			
		||||
 | 
			
		||||
    private final TbServiceInfoProvider serviceInfoProvider;
 | 
			
		||||
@ -99,40 +100,42 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
 | 
			
		||||
    private final DataDecodingEncodingService encodingService;
 | 
			
		||||
    private final DefaultEntitiesVersionControlService entitiesVersionControlService;
 | 
			
		||||
    private final SchedulerComponent scheduler;
 | 
			
		||||
    private final VersionControlExecutor executor;
 | 
			
		||||
 | 
			
		||||
    private final Map<UUID, PendingGitRequest<?>> pendingRequestMap = new HashMap<>();
 | 
			
		||||
    private final Map<UUID, PendingGitRequest<?>> pendingRequestMap = new ConcurrentHashMap<>();
 | 
			
		||||
    private final Map<UUID, HashMap<Integer, String[]>> chunkedMsgs = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.vc.request-timeout:180000}")
 | 
			
		||||
    private int requestTimeout;
 | 
			
		||||
    @Value("${queue.vc.msg-chunk-size:500000}")
 | 
			
		||||
    @Value("${queue.vc.msg-chunk-size:250000}")
 | 
			
		||||
    private int msgChunkSize;
 | 
			
		||||
 | 
			
		||||
    public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService,
 | 
			
		||||
                                                DataDecodingEncodingService encodingService,
 | 
			
		||||
                                                @Lazy DefaultEntitiesVersionControlService entitiesVersionControlService,
 | 
			
		||||
                                                SchedulerComponent scheduler) {
 | 
			
		||||
                                                SchedulerComponent scheduler, VersionControlExecutor executor) {
 | 
			
		||||
        this.serviceInfoProvider = serviceInfoProvider;
 | 
			
		||||
        this.clusterService = clusterService;
 | 
			
		||||
        this.encodingService = encodingService;
 | 
			
		||||
        this.entitiesVersionControlService = entitiesVersionControlService;
 | 
			
		||||
        this.scheduler = scheduler;
 | 
			
		||||
        this.executor = executor;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<CommitGitRequest> prepareCommit(User user, VersionCreateRequest request) {
 | 
			
		||||
        SettableFuture<CommitGitRequest> future = SettableFuture.create();
 | 
			
		||||
 | 
			
		||||
        log.debug("Executing prepareCommit [{}][{}]", request.getBranch(), request.getVersionName());
 | 
			
		||||
        CommitGitRequest commit = new CommitGitRequest(user.getTenantId(), request);
 | 
			
		||||
        registerAndSend(commit, builder -> builder.setCommitRequest(
 | 
			
		||||
        ListenableFuture<Void> future = registerAndSend(commit, builder -> builder.setCommitRequest(
 | 
			
		||||
                buildCommitRequest(commit).setPrepareMsg(getCommitPrepareMsg(user, request)).build()
 | 
			
		||||
        ).build(), wrap(future, commit));
 | 
			
		||||
        return future;
 | 
			
		||||
        ).build());
 | 
			
		||||
        return Futures.transform(future, f -> commit, executor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @SuppressWarnings("UnstableApiUsage")
 | 
			
		||||
    @SneakyThrows
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> addToCommit(CommitGitRequest commit, EntityExportData<ExportableEntity<EntityId>> entityData) {
 | 
			
		||||
        log.debug("Executing addToCommit [{}][{}][{}]", entityData.getEntityType(), entityData.getEntity().getId(), commit.getRequestId());
 | 
			
		||||
        String path = getRelativePath(entityData.getEntityType(), entityData.getExternalId());
 | 
			
		||||
        String entityDataJson = JacksonUtil.toPrettyString(entityData.sort());
 | 
			
		||||
 | 
			
		||||
@ -143,53 +146,42 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
 | 
			
		||||
        AtomicInteger chunkIndex = new AtomicInteger();
 | 
			
		||||
        List<ListenableFuture<Void>> futures = new ArrayList<>();
 | 
			
		||||
        entityDataChunks.forEach(chunk -> {
 | 
			
		||||
            SettableFuture<Void> chunkFuture = SettableFuture.create();
 | 
			
		||||
            log.trace("[{}] sending chunk {} for 'addToCommit'", chunkedMsgId, chunkIndex.get());
 | 
			
		||||
            registerAndSend(commit, builder -> builder.setCommitRequest(
 | 
			
		||||
                    buildCommitRequest(commit).setAddMsg(
 | 
			
		||||
                            TransportProtos.AddMsg.newBuilder()
 | 
			
		||||
                                    .setRelativePath(path).setEntityDataJsonChunk(chunk)
 | 
			
		||||
                                    .setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement())
 | 
			
		||||
                                    .setChunksCount(chunksCount).build()
 | 
			
		||||
            ListenableFuture<Void> chunkFuture = registerAndSend(commit, builder -> builder.setCommitRequest(
 | 
			
		||||
                    buildCommitRequest(commit).setAddMsg(TransportProtos.AddMsg.newBuilder()
 | 
			
		||||
                            .setRelativePath(path).setEntityDataJsonChunk(chunk)
 | 
			
		||||
                            .setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement())
 | 
			
		||||
                            .setChunksCount(chunksCount)
 | 
			
		||||
                    ).build()
 | 
			
		||||
            ).build(), wrap(chunkFuture, null));
 | 
			
		||||
            ).build());
 | 
			
		||||
            futures.add(chunkFuture);
 | 
			
		||||
        });
 | 
			
		||||
        return Futures.transform(Futures.allAsList(futures), r -> {
 | 
			
		||||
            log.trace("[{}] sent all chunks for 'addToCommit'", chunkedMsgId);
 | 
			
		||||
            return null;
 | 
			
		||||
        }, MoreExecutors.directExecutor());
 | 
			
		||||
        }, executor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> deleteAll(CommitGitRequest commit, EntityType entityType) {
 | 
			
		||||
        SettableFuture<Void> future = SettableFuture.create();
 | 
			
		||||
 | 
			
		||||
        log.debug("Executing deleteAll [{}][{}][{}]", commit.getTenantId(), entityType, commit.getRequestId());
 | 
			
		||||
        String path = getRelativePath(entityType, null);
 | 
			
		||||
 | 
			
		||||
        registerAndSend(commit, builder -> builder.setCommitRequest(
 | 
			
		||||
        return registerAndSend(commit, builder -> builder.setCommitRequest(
 | 
			
		||||
                buildCommitRequest(commit).setDeleteMsg(
 | 
			
		||||
                        TransportProtos.DeleteMsg.newBuilder().setRelativePath(path).build()
 | 
			
		||||
                ).build()
 | 
			
		||||
        ).build(), wrap(future, null));
 | 
			
		||||
 | 
			
		||||
        return future;
 | 
			
		||||
                        TransportProtos.DeleteMsg.newBuilder().setRelativePath(path)
 | 
			
		||||
                )).build());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<VersionCreationResult> push(CommitGitRequest commit) {
 | 
			
		||||
        registerAndSend(commit, builder -> builder.setCommitRequest(
 | 
			
		||||
                buildCommitRequest(commit).setPushMsg(
 | 
			
		||||
                        TransportProtos.PushMsg.newBuilder().build()
 | 
			
		||||
                ).build()
 | 
			
		||||
        ).build(), wrap(commit.getFuture()));
 | 
			
		||||
 | 
			
		||||
        return commit.getFuture();
 | 
			
		||||
        log.debug("Executing push [{}][{}]", commit.getTenantId(), commit.getRequestId());
 | 
			
		||||
        return sendRequest(commit, builder -> builder.setCommitRequest(
 | 
			
		||||
                buildCommitRequest(commit).setPushMsg(TransportProtos.PushMsg.getDefaultInstance())
 | 
			
		||||
        ));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<PageData<EntityVersion>> listVersions(TenantId tenantId, String branch, PageLink pageLink) {
 | 
			
		||||
 | 
			
		||||
        return listVersions(tenantId,
 | 
			
		||||
                applyPageLinkParameters(
 | 
			
		||||
                        ListVersionsRequestMsg.newBuilder()
 | 
			
		||||
@ -284,90 +276,95 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
 | 
			
		||||
    @Override
 | 
			
		||||
    @SuppressWarnings("rawtypes")
 | 
			
		||||
    public ListenableFuture<EntityExportData> getEntity(TenantId tenantId, String versionId, EntityId entityId) {
 | 
			
		||||
        log.debug("Executing getEntity [{}][{}][{}]", tenantId, versionId, entityId);
 | 
			
		||||
        EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId);
 | 
			
		||||
        chunkedMsgs.put(request.getRequestId(), new HashMap<>());
 | 
			
		||||
        registerAndSend(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder()
 | 
			
		||||
                        .setVersionId(versionId)
 | 
			
		||||
                        .setEntityType(entityId.getEntityType().name())
 | 
			
		||||
                        .setEntityIdMSB(entityId.getId().getMostSignificantBits())
 | 
			
		||||
                        .setEntityIdLSB(entityId.getId().getLeastSignificantBits())).build()
 | 
			
		||||
                , wrap(request.getFuture()));
 | 
			
		||||
        return request.getFuture();
 | 
			
		||||
        return sendRequest(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder()
 | 
			
		||||
                .setVersionId(versionId)
 | 
			
		||||
                .setEntityType(entityId.getEntityType().name())
 | 
			
		||||
                .setEntityIdMSB(entityId.getId().getMostSignificantBits())
 | 
			
		||||
                .setEntityIdLSB(entityId.getId().getLeastSignificantBits())).build());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private <T> void registerAndSend(PendingGitRequest<T> request,
 | 
			
		||||
                                     Function<ToVersionControlServiceMsg.Builder, ToVersionControlServiceMsg> enrichFunction, TbQueueCallback callback) {
 | 
			
		||||
        registerAndSend(request, enrichFunction, null, callback);
 | 
			
		||||
    private <T> ListenableFuture<Void> registerAndSend(PendingGitRequest<T> request,
 | 
			
		||||
                                                       Function<ToVersionControlServiceMsg.Builder, ToVersionControlServiceMsg> enrichFunction) {
 | 
			
		||||
        return registerAndSend(request, enrichFunction, null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private <T> void registerAndSend(PendingGitRequest<T> request,
 | 
			
		||||
                                     Function<ToVersionControlServiceMsg.Builder, ToVersionControlServiceMsg> enrichFunction, RepositorySettings settings, TbQueueCallback callback) {
 | 
			
		||||
    private <T> ListenableFuture<Void> registerAndSend(PendingGitRequest<T> request,
 | 
			
		||||
                                                       Function<ToVersionControlServiceMsg.Builder, ToVersionControlServiceMsg> enrichFunction,
 | 
			
		||||
                                                       RepositorySettings settings) {
 | 
			
		||||
        if (!request.getFuture().isDone()) {
 | 
			
		||||
            pendingRequestMap.putIfAbsent(request.getRequestId(), request);
 | 
			
		||||
            var requestBody = enrichFunction.apply(newRequestProto(request, settings));
 | 
			
		||||
            log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody);
 | 
			
		||||
            clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback);
 | 
			
		||||
            SettableFuture<Void> submitFuture = SettableFuture.create();
 | 
			
		||||
            clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, new TbQueueCallback() {
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onSuccess(TbQueueMsgMetadata metadata) {
 | 
			
		||||
                    submitFuture.set(null);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onFailure(Throwable t) {
 | 
			
		||||
                    submitFuture.setException(t);
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
            if (request.getTimeoutTask() == null) {
 | 
			
		||||
                request.setTimeoutTask(scheduler.schedule(() -> processTimeout(request.getRequestId()), requestTimeout, TimeUnit.MILLISECONDS));
 | 
			
		||||
            }
 | 
			
		||||
            return submitFuture;
 | 
			
		||||
        } else {
 | 
			
		||||
            throw new RuntimeException("Future is already done!");
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private <T> ListenableFuture<T> sendRequest(PendingGitRequest<T> request, Consumer<ToVersionControlServiceMsg.Builder> enrichFunction) {
 | 
			
		||||
        registerAndSend(request, builder -> {
 | 
			
		||||
        return sendRequest(request, enrichFunction, null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private <T> ListenableFuture<T> sendRequest(PendingGitRequest<T> request, Consumer<ToVersionControlServiceMsg.Builder> enrichFunction, RepositorySettings settings) {
 | 
			
		||||
        ListenableFuture<Void> submitFuture = registerAndSend(request, builder -> {
 | 
			
		||||
            enrichFunction.accept(builder);
 | 
			
		||||
            return builder.build();
 | 
			
		||||
        }, wrap(request.getFuture()));
 | 
			
		||||
        return request.getFuture();
 | 
			
		||||
        }, settings);
 | 
			
		||||
        return Futures.transformAsync(submitFuture, input -> request.getFuture(), executor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    @SuppressWarnings("rawtypes")
 | 
			
		||||
    public ListenableFuture<List<EntityExportData>> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) {
 | 
			
		||||
        log.debug("Executing getEntities [{}][{}][{}]", tenantId, versionId, entityType);
 | 
			
		||||
        EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType);
 | 
			
		||||
        chunkedMsgs.put(request.getRequestId(), new HashMap<>());
 | 
			
		||||
        registerAndSend(request, builder -> builder.setEntitiesContentRequest(EntitiesContentRequestMsg.newBuilder()
 | 
			
		||||
        return sendRequest(request, builder -> builder.setEntitiesContentRequest(
 | 
			
		||||
                EntitiesContentRequestMsg.newBuilder()
 | 
			
		||||
                        .setVersionId(versionId)
 | 
			
		||||
                        .setEntityType(entityType.name())
 | 
			
		||||
                        .setOffset(offset)
 | 
			
		||||
                        .setLimit(limit)
 | 
			
		||||
                ).build()
 | 
			
		||||
                , wrap(request.getFuture()));
 | 
			
		||||
 | 
			
		||||
        return request.getFuture();
 | 
			
		||||
        ).build());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> initRepository(TenantId tenantId, RepositorySettings settings) {
 | 
			
		||||
        log.debug("Executing initRepository [{}]", tenantId);
 | 
			
		||||
        VoidGitRequest request = new VoidGitRequest(tenantId);
 | 
			
		||||
 | 
			
		||||
        registerAndSend(request, builder -> builder.setInitRepositoryRequest(GenericRepositoryRequestMsg.newBuilder().build()).build()
 | 
			
		||||
                , settings, wrap(request.getFuture()));
 | 
			
		||||
 | 
			
		||||
        return request.getFuture();
 | 
			
		||||
        return sendRequest(request, builder -> builder.setInitRepositoryRequest(GenericRepositoryRequestMsg.getDefaultInstance()), settings);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> testRepository(TenantId tenantId, RepositorySettings settings) {
 | 
			
		||||
        log.debug("Executing testRepository [{}]", tenantId);
 | 
			
		||||
        VoidGitRequest request = new VoidGitRequest(tenantId);
 | 
			
		||||
 | 
			
		||||
        registerAndSend(request, builder -> builder
 | 
			
		||||
                        .setTestRepositoryRequest(GenericRepositoryRequestMsg.newBuilder().build()).build()
 | 
			
		||||
                , settings, wrap(request.getFuture()));
 | 
			
		||||
 | 
			
		||||
        return request.getFuture();
 | 
			
		||||
        return sendRequest(request, builder -> builder.setTestRepositoryRequest(GenericRepositoryRequestMsg.getDefaultInstance()), settings);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> clearRepository(TenantId tenantId) {
 | 
			
		||||
        log.debug("Executing clearRepository [{}]", tenantId);
 | 
			
		||||
        ClearRepositoryGitRequest request = new ClearRepositoryGitRequest(tenantId);
 | 
			
		||||
 | 
			
		||||
        registerAndSend(request, builder -> builder.setClearRepositoryRequest(GenericRepositoryRequestMsg.newBuilder().build()).build()
 | 
			
		||||
                , wrap(request.getFuture()));
 | 
			
		||||
 | 
			
		||||
        return request.getFuture();
 | 
			
		||||
        return sendRequest(request, builder -> builder.setClearRepositoryRequest(GenericRepositoryRequestMsg.getDefaultInstance()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -518,35 +515,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
 | 
			
		||||
        return JacksonUtil.fromString(data, EntityExportData.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    //The future will be completed when the corresponding result arrives from kafka
 | 
			
		||||
    private static <T> TbQueueCallback wrap(SettableFuture<T> future) {
 | 
			
		||||
        return new TbQueueCallback() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(TbQueueMsgMetadata metadata) {
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(Throwable t) {
 | 
			
		||||
                future.setException(t);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    //The future will be completed when the request is successfully sent to kafka
 | 
			
		||||
    private <T> TbQueueCallback wrap(SettableFuture<T> future, T value) {
 | 
			
		||||
        return new TbQueueCallback() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(TbQueueMsgMetadata metadata) {
 | 
			
		||||
                future.set(value);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(Throwable t) {
 | 
			
		||||
                future.setException(t);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static String getRelativePath(EntityType entityType, EntityId entityId) {
 | 
			
		||||
        String path = entityType.name().toLowerCase();
 | 
			
		||||
        if (entityId != null) {
 | 
			
		||||
 | 
			
		||||
@ -1379,7 +1379,7 @@ queue:
 | 
			
		||||
      # Kafka properties for OTA updates topic
 | 
			
		||||
      ota-updates: "${TB_QUEUE_KAFKA_OTA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for Version Control topic
 | 
			
		||||
      version-control: "${TB_QUEUE_KAFKA_VC_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
 | 
			
		||||
      version-control: "${TB_QUEUE_KAFKA_VC_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
    consumer-stats:
 | 
			
		||||
      # Prints lag between consumer group offset and last messages offset in Kafka topics
 | 
			
		||||
      enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
 | 
			
		||||
@ -1672,7 +1672,7 @@ metrics:
 | 
			
		||||
# Version control parameters
 | 
			
		||||
vc:
 | 
			
		||||
  # Pool size for handling export tasks
 | 
			
		||||
  thread_pool_size: "${TB_VC_POOL_SIZE:2}"
 | 
			
		||||
  thread_pool_size: "${TB_VC_POOL_SIZE:6}"
 | 
			
		||||
  git:
 | 
			
		||||
    # Pool size for handling the git IO operations
 | 
			
		||||
    io_pool_size: "${TB_VC_GIT_POOL_SIZE:3}"
 | 
			
		||||
 | 
			
		||||
@ -127,7 +127,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
 | 
			
		||||
    private long packProcessingTimeout;
 | 
			
		||||
    @Value("${vc.git.io_pool_size:3}")
 | 
			
		||||
    private int ioPoolSize;
 | 
			
		||||
    @Value("${queue.vc.msg-chunk-size:500000}")
 | 
			
		||||
    @Value("${queue.vc.msg-chunk-size:250000}")
 | 
			
		||||
    private int msgChunkSize;
 | 
			
		||||
 | 
			
		||||
    //We need to manually manage the threads since tasks for particular tenant need to be processed sequentially.
 | 
			
		||||
@ -304,6 +304,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
 | 
			
		||||
 | 
			
		||||
    private void handleEntityContentRequest(VersionControlRequestCtx ctx, EntityContentRequestMsg request) throws IOException {
 | 
			
		||||
        String path = getRelativePath(EntityType.valueOf(request.getEntityType()), new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString());
 | 
			
		||||
        log.debug("Executing handleEntityContentRequest [{}][{}]", ctx.getTenantId(), path);
 | 
			
		||||
        String data = vcService.getFileContentAtCommit(ctx.getTenantId(), path, request.getVersionId());
 | 
			
		||||
 | 
			
		||||
        Iterable<String> dataChunks = StringUtils.split(data, msgChunkSize);
 | 
			
		||||
@ -394,6 +395,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void handleCommitRequest(VersionControlRequestCtx ctx, CommitRequestMsg request) throws Exception {
 | 
			
		||||
        log.debug("Executing handleCommitRequest [{}][{}]", ctx.getTenantId(), ctx.getRequestId());
 | 
			
		||||
        var tenantId = ctx.getTenantId();
 | 
			
		||||
        UUID txId = UUID.fromString(request.getTxId());
 | 
			
		||||
        if (request.hasPrepareMsg()) {
 | 
			
		||||
@ -444,6 +446,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void addToCommit(VersionControlRequestCtx ctx, PendingCommit commit, AddMsg addMsg) throws IOException {
 | 
			
		||||
        log.debug("Executing addToCommit [{}][{}]", ctx.getTenantId(), ctx.getRequestId());
 | 
			
		||||
        log.trace("[{}] received chunk {} for 'addToCommit'", addMsg.getChunkedMsgId(), addMsg.getChunkIndex());
 | 
			
		||||
        Map<String, String[]> chunkedMsgs = commit.getChunkedMsgs();
 | 
			
		||||
        String[] msgChunks = chunkedMsgs.computeIfAbsent(addMsg.getChunkedMsgId(), id -> new String[addMsg.getChunksCount()]);
 | 
			
		||||
 | 
			
		||||
@ -136,7 +136,7 @@ queue:
 | 
			
		||||
      # Kafka properties for Notifications topics
 | 
			
		||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for Core topics
 | 
			
		||||
      version-control: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      version-control: "${TB_QUEUE_KAFKA_VC_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
    consumer-stats:
 | 
			
		||||
      # Prints lag between consumer group offset and last messages offset in Kafka topics
 | 
			
		||||
      enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
 | 
			
		||||
@ -263,7 +263,7 @@ queue:
 | 
			
		||||
# Version control parameters
 | 
			
		||||
vc:
 | 
			
		||||
  # Pool size for handling export tasks
 | 
			
		||||
  thread_pool_size: "${TB_VC_POOL_SIZE:2}"
 | 
			
		||||
  thread_pool_size: "${TB_VC_POOL_SIZE:6}"
 | 
			
		||||
  git:
 | 
			
		||||
    # Pool size for handling the git IO operations
 | 
			
		||||
    io_pool_size: "${TB_VC_GIT_POOL_SIZE:3}"
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user