VC: fix Kafka buffer exhausted and producer stuck; refactoring and improvements

This commit is contained in:
ViacheslavKlimov 2023-12-22 14:53:52 +02:00
parent d788b00a69
commit daabf26708
8 changed files with 130 additions and 142 deletions

View File

@ -31,6 +31,7 @@ import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.context.request.async.AsyncRequestTimeoutException; import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.Dashboard;
@ -833,18 +834,14 @@ public abstract class BaseController {
} }
protected <T> DeferredResult<T> wrapFuture(ListenableFuture<T> future) { protected <T> DeferredResult<T> wrapFuture(ListenableFuture<T> future) {
final DeferredResult<T> deferredResult = new DeferredResult<>(); DeferredResult<T> deferredResult = new DeferredResult<>(); // Timeout of spring.mvc.async.request-timeout is used
Futures.addCallback(future, new FutureCallback<>() { DonAsynchron.withCallback(future, deferredResult::setResult, deferredResult::setErrorResult);
@Override return deferredResult;
public void onSuccess(T result) {
deferredResult.setResult(result);
} }
@Override protected <T> DeferredResult<T> wrapFuture(ListenableFuture<T> future, long timeoutMs) {
public void onFailure(Throwable t) { DeferredResult<T> deferredResult = new DeferredResult<>(timeoutMs);
deferredResult.setErrorResult(t); DonAsynchron.withCallback(future, deferredResult::setResult, deferredResult::setErrorResult);
}
}, MoreExecutors.directExecutor());
return deferredResult; return deferredResult;
} }

View File

@ -21,6 +21,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiParam;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PathVariable;
@ -83,6 +84,8 @@ public class EntitiesVersionControlController extends BaseController {
private final EntitiesVersionControlService versionControlService; private final EntitiesVersionControlService versionControlService;
@Value("${queue.vc.request-timeout:180000}")
private int vcRequestTimeout;
@ApiOperation(value = "Save entities version (saveEntitiesVersion)", notes = "" + @ApiOperation(value = "Save entities version (saveEntitiesVersion)", notes = "" +
"Creates a new version of entities (or a single entity) by request.\n" + "Creates a new version of entities (or a single entity) by request.\n" +
@ -515,4 +518,9 @@ public class EntitiesVersionControlController extends BaseController {
}, MoreExecutors.directExecutor())); }, MoreExecutors.directExecutor()));
} }
@Override
protected <T> DeferredResult<T> wrapFuture(ListenableFuture<T> future) {
return wrapFuture(future, vcRequestTimeout);
}
} }

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.executors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.AbstractListeningExecutor;
@Component
public class VersionControlExecutor extends AbstractListeningExecutor {
@Value("${vc.thread_pool_size:6}")
private int threadPoolSize;
@Override
protected int getThreadPollSize() {
return threadPoolSize;
}
}

View File

@ -17,19 +17,16 @@ package org.thingsboard.server.service.sync.vc;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.TbStopWatch; import org.thingsboard.common.util.TbStopWatch;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.cache.TbTransactionalCache; import org.thingsboard.server.cache.TbTransactionalCache;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ExportableEntity; import org.thingsboard.server.common.data.ExportableEntity;
@ -72,6 +69,7 @@ import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.exception.DeviceCredentialsValidationException; import org.thingsboard.server.dao.exception.DeviceCredentialsValidationException;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.entitiy.TbNotificationEntityService; import org.thingsboard.server.service.entitiy.TbNotificationEntityService;
import org.thingsboard.server.service.executors.VersionControlExecutor;
import org.thingsboard.server.service.sync.ie.EntitiesExportImportService; import org.thingsboard.server.service.sync.ie.EntitiesExportImportService;
import org.thingsboard.server.service.sync.ie.exporting.ExportableEntitiesService; import org.thingsboard.server.service.sync.ie.exporting.ExportableEntitiesService;
import org.thingsboard.server.service.sync.ie.importing.impl.MissingEntityException; import org.thingsboard.server.service.sync.ie.importing.impl.MissingEntityException;
@ -85,8 +83,6 @@ import org.thingsboard.server.service.sync.vc.data.ReimportTask;
import org.thingsboard.server.service.sync.vc.data.SimpleEntitiesExportCtx; import org.thingsboard.server.service.sync.vc.data.SimpleEntitiesExportCtx;
import org.thingsboard.server.service.sync.vc.repository.TbRepositorySettingsService; import org.thingsboard.server.service.sync.vc.repository.TbRepositorySettingsService;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -115,23 +111,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
private final TbNotificationEntityService entityNotificationService; private final TbNotificationEntityService entityNotificationService;
private final TransactionTemplate transactionTemplate; private final TransactionTemplate transactionTemplate;
private final TbTransactionalCache<UUID, VersionControlTaskCacheEntry> taskCache; private final TbTransactionalCache<UUID, VersionControlTaskCacheEntry> taskCache;
private final VersionControlExecutor executor;
private ListeningExecutorService executor;
@Value("${vc.thread_pool_size:4}")
private int threadPoolSize;
@PostConstruct
public void init() {
executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, DefaultEntitiesVersionControlService.class));
}
@PreDestroy
public void shutdown() {
if (executor != null) {
executor.shutdownNow();
}
}
@SuppressWarnings("UnstableApiUsage") @SuppressWarnings("UnstableApiUsage")
@Override @Override
@ -185,7 +165,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
throw new ThingsboardException(ThingsboardErrorCode.ITEM_NOT_FOUND); throw new ThingsboardException(ThingsboardErrorCode.ITEM_NOT_FOUND);
} else { } else {
var entry = cacheEntry.get(); var entry = cacheEntry.get();
log.debug("[{}] Cache get: {}", requestId, entry); log.trace("[{}] Cache get: {}", requestId, entry);
var result = getter.apply(entry); var result = getter.apply(entry);
if (result == null) { if (result == null) {
throw new ThingsboardException(ThingsboardErrorCode.BAD_REQUEST_PARAMS); throw new ThingsboardException(ThingsboardErrorCode.BAD_REQUEST_PARAMS);
@ -623,7 +603,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
} }
private VersionLoadResult cachePut(UUID requestId, VersionLoadResult result) { private VersionLoadResult cachePut(UUID requestId, VersionLoadResult result) {
log.debug("[{}] Cache put: {}", requestId, result); log.trace("[{}] Cache put: {}", requestId, result);
taskCache.put(requestId, VersionControlTaskCacheEntry.newForImport(result)); taskCache.put(requestId, VersionControlTaskCacheEntry.newForImport(result));
return result; return result;
} }

View File

@ -18,7 +18,6 @@ package org.thingsboard.server.service.sync.vc;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@ -62,6 +61,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.executors.VersionControlExecutor;
import org.thingsboard.server.service.sync.vc.data.ClearRepositoryGitRequest; import org.thingsboard.server.service.sync.vc.data.ClearRepositoryGitRequest;
import org.thingsboard.server.service.sync.vc.data.CommitGitRequest; import org.thingsboard.server.service.sync.vc.data.CommitGitRequest;
import org.thingsboard.server.service.sync.vc.data.EntitiesContentGitRequest; import org.thingsboard.server.service.sync.vc.data.EntitiesContentGitRequest;
@ -92,6 +92,7 @@ import java.util.stream.Collectors;
@TbCoreComponent @TbCoreComponent
@Service @Service
@Slf4j @Slf4j
@SuppressWarnings("UnstableApiUsage")
public class DefaultGitVersionControlQueueService implements GitVersionControlQueueService { public class DefaultGitVersionControlQueueService implements GitVersionControlQueueService {
private final TbServiceInfoProvider serviceInfoProvider; private final TbServiceInfoProvider serviceInfoProvider;
@ -99,40 +100,42 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
private final DataDecodingEncodingService encodingService; private final DataDecodingEncodingService encodingService;
private final DefaultEntitiesVersionControlService entitiesVersionControlService; private final DefaultEntitiesVersionControlService entitiesVersionControlService;
private final SchedulerComponent scheduler; private final SchedulerComponent scheduler;
private final VersionControlExecutor executor;
private final Map<UUID, PendingGitRequest<?>> pendingRequestMap = new HashMap<>(); private final Map<UUID, PendingGitRequest<?>> pendingRequestMap = new ConcurrentHashMap<>();
private final Map<UUID, HashMap<Integer, String[]>> chunkedMsgs = new ConcurrentHashMap<>(); private final Map<UUID, HashMap<Integer, String[]>> chunkedMsgs = new ConcurrentHashMap<>();
@Value("${queue.vc.request-timeout:180000}") @Value("${queue.vc.request-timeout:180000}")
private int requestTimeout; private int requestTimeout;
@Value("${queue.vc.msg-chunk-size:500000}") @Value("${queue.vc.msg-chunk-size:250000}")
private int msgChunkSize; private int msgChunkSize;
public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService, public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService,
DataDecodingEncodingService encodingService, DataDecodingEncodingService encodingService,
@Lazy DefaultEntitiesVersionControlService entitiesVersionControlService, @Lazy DefaultEntitiesVersionControlService entitiesVersionControlService,
SchedulerComponent scheduler) { SchedulerComponent scheduler, VersionControlExecutor executor) {
this.serviceInfoProvider = serviceInfoProvider; this.serviceInfoProvider = serviceInfoProvider;
this.clusterService = clusterService; this.clusterService = clusterService;
this.encodingService = encodingService; this.encodingService = encodingService;
this.entitiesVersionControlService = entitiesVersionControlService; this.entitiesVersionControlService = entitiesVersionControlService;
this.scheduler = scheduler; this.scheduler = scheduler;
this.executor = executor;
} }
@Override @Override
public ListenableFuture<CommitGitRequest> prepareCommit(User user, VersionCreateRequest request) { public ListenableFuture<CommitGitRequest> prepareCommit(User user, VersionCreateRequest request) {
SettableFuture<CommitGitRequest> future = SettableFuture.create(); log.debug("Executing prepareCommit [{}][{}]", request.getBranch(), request.getVersionName());
CommitGitRequest commit = new CommitGitRequest(user.getTenantId(), request); CommitGitRequest commit = new CommitGitRequest(user.getTenantId(), request);
registerAndSend(commit, builder -> builder.setCommitRequest( ListenableFuture<Void> future = registerAndSend(commit, builder -> builder.setCommitRequest(
buildCommitRequest(commit).setPrepareMsg(getCommitPrepareMsg(user, request)).build() buildCommitRequest(commit).setPrepareMsg(getCommitPrepareMsg(user, request)).build()
).build(), wrap(future, commit)); ).build());
return future; return Futures.transform(future, f -> commit, executor);
} }
@SuppressWarnings("UnstableApiUsage") @SneakyThrows
@Override @Override
public ListenableFuture<Void> addToCommit(CommitGitRequest commit, EntityExportData<ExportableEntity<EntityId>> entityData) { public ListenableFuture<Void> addToCommit(CommitGitRequest commit, EntityExportData<ExportableEntity<EntityId>> entityData) {
log.debug("Executing addToCommit [{}][{}][{}]", entityData.getEntityType(), entityData.getEntity().getId(), commit.getRequestId());
String path = getRelativePath(entityData.getEntityType(), entityData.getExternalId()); String path = getRelativePath(entityData.getEntityType(), entityData.getExternalId());
String entityDataJson = JacksonUtil.toPrettyString(entityData.sort()); String entityDataJson = JacksonUtil.toPrettyString(entityData.sort());
@ -143,53 +146,42 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
AtomicInteger chunkIndex = new AtomicInteger(); AtomicInteger chunkIndex = new AtomicInteger();
List<ListenableFuture<Void>> futures = new ArrayList<>(); List<ListenableFuture<Void>> futures = new ArrayList<>();
entityDataChunks.forEach(chunk -> { entityDataChunks.forEach(chunk -> {
SettableFuture<Void> chunkFuture = SettableFuture.create();
log.trace("[{}] sending chunk {} for 'addToCommit'", chunkedMsgId, chunkIndex.get()); log.trace("[{}] sending chunk {} for 'addToCommit'", chunkedMsgId, chunkIndex.get());
registerAndSend(commit, builder -> builder.setCommitRequest( ListenableFuture<Void> chunkFuture = registerAndSend(commit, builder -> builder.setCommitRequest(
buildCommitRequest(commit).setAddMsg( buildCommitRequest(commit).setAddMsg(TransportProtos.AddMsg.newBuilder()
TransportProtos.AddMsg.newBuilder()
.setRelativePath(path).setEntityDataJsonChunk(chunk) .setRelativePath(path).setEntityDataJsonChunk(chunk)
.setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement()) .setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement())
.setChunksCount(chunksCount).build() .setChunksCount(chunksCount)
).build() ).build()
).build(), wrap(chunkFuture, null)); ).build());
futures.add(chunkFuture); futures.add(chunkFuture);
}); });
return Futures.transform(Futures.allAsList(futures), r -> { return Futures.transform(Futures.allAsList(futures), r -> {
log.trace("[{}] sent all chunks for 'addToCommit'", chunkedMsgId); log.trace("[{}] sent all chunks for 'addToCommit'", chunkedMsgId);
return null; return null;
}, MoreExecutors.directExecutor()); }, executor);
} }
@Override @Override
public ListenableFuture<Void> deleteAll(CommitGitRequest commit, EntityType entityType) { public ListenableFuture<Void> deleteAll(CommitGitRequest commit, EntityType entityType) {
SettableFuture<Void> future = SettableFuture.create(); log.debug("Executing deleteAll [{}][{}][{}]", commit.getTenantId(), entityType, commit.getRequestId());
String path = getRelativePath(entityType, null); String path = getRelativePath(entityType, null);
return registerAndSend(commit, builder -> builder.setCommitRequest(
registerAndSend(commit, builder -> builder.setCommitRequest(
buildCommitRequest(commit).setDeleteMsg( buildCommitRequest(commit).setDeleteMsg(
TransportProtos.DeleteMsg.newBuilder().setRelativePath(path).build() TransportProtos.DeleteMsg.newBuilder().setRelativePath(path)
).build() )).build());
).build(), wrap(future, null));
return future;
} }
@Override @Override
public ListenableFuture<VersionCreationResult> push(CommitGitRequest commit) { public ListenableFuture<VersionCreationResult> push(CommitGitRequest commit) {
registerAndSend(commit, builder -> builder.setCommitRequest( log.debug("Executing push [{}][{}]", commit.getTenantId(), commit.getRequestId());
buildCommitRequest(commit).setPushMsg( return sendRequest(commit, builder -> builder.setCommitRequest(
TransportProtos.PushMsg.newBuilder().build() buildCommitRequest(commit).setPushMsg(TransportProtos.PushMsg.getDefaultInstance())
).build() ));
).build(), wrap(commit.getFuture()));
return commit.getFuture();
} }
@Override @Override
public ListenableFuture<PageData<EntityVersion>> listVersions(TenantId tenantId, String branch, PageLink pageLink) { public ListenableFuture<PageData<EntityVersion>> listVersions(TenantId tenantId, String branch, PageLink pageLink) {
return listVersions(tenantId, return listVersions(tenantId,
applyPageLinkParameters( applyPageLinkParameters(
ListVersionsRequestMsg.newBuilder() ListVersionsRequestMsg.newBuilder()
@ -284,90 +276,95 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@Override @Override
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public ListenableFuture<EntityExportData> getEntity(TenantId tenantId, String versionId, EntityId entityId) { public ListenableFuture<EntityExportData> getEntity(TenantId tenantId, String versionId, EntityId entityId) {
log.debug("Executing getEntity [{}][{}][{}]", tenantId, versionId, entityId);
EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId); EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId);
chunkedMsgs.put(request.getRequestId(), new HashMap<>()); chunkedMsgs.put(request.getRequestId(), new HashMap<>());
registerAndSend(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder() return sendRequest(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder()
.setVersionId(versionId) .setVersionId(versionId)
.setEntityType(entityId.getEntityType().name()) .setEntityType(entityId.getEntityType().name())
.setEntityIdMSB(entityId.getId().getMostSignificantBits()) .setEntityIdMSB(entityId.getId().getMostSignificantBits())
.setEntityIdLSB(entityId.getId().getLeastSignificantBits())).build() .setEntityIdLSB(entityId.getId().getLeastSignificantBits())).build());
, wrap(request.getFuture()));
return request.getFuture();
} }
private <T> void registerAndSend(PendingGitRequest<T> request, private <T> ListenableFuture<Void> registerAndSend(PendingGitRequest<T> request,
Function<ToVersionControlServiceMsg.Builder, ToVersionControlServiceMsg> enrichFunction, TbQueueCallback callback) { Function<ToVersionControlServiceMsg.Builder, ToVersionControlServiceMsg> enrichFunction) {
registerAndSend(request, enrichFunction, null, callback); return registerAndSend(request, enrichFunction, null);
} }
private <T> void registerAndSend(PendingGitRequest<T> request, private <T> ListenableFuture<Void> registerAndSend(PendingGitRequest<T> request,
Function<ToVersionControlServiceMsg.Builder, ToVersionControlServiceMsg> enrichFunction, RepositorySettings settings, TbQueueCallback callback) { Function<ToVersionControlServiceMsg.Builder, ToVersionControlServiceMsg> enrichFunction,
RepositorySettings settings) {
if (!request.getFuture().isDone()) { if (!request.getFuture().isDone()) {
pendingRequestMap.putIfAbsent(request.getRequestId(), request); pendingRequestMap.putIfAbsent(request.getRequestId(), request);
var requestBody = enrichFunction.apply(newRequestProto(request, settings)); var requestBody = enrichFunction.apply(newRequestProto(request, settings));
log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody); log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody);
clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback); SettableFuture<Void> submitFuture = SettableFuture.create();
clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
submitFuture.set(null);
}
@Override
public void onFailure(Throwable t) {
submitFuture.setException(t);
}
});
if (request.getTimeoutTask() == null) { if (request.getTimeoutTask() == null) {
request.setTimeoutTask(scheduler.schedule(() -> processTimeout(request.getRequestId()), requestTimeout, TimeUnit.MILLISECONDS)); request.setTimeoutTask(scheduler.schedule(() -> processTimeout(request.getRequestId()), requestTimeout, TimeUnit.MILLISECONDS));
} }
return submitFuture;
} else { } else {
throw new RuntimeException("Future is already done!"); throw new RuntimeException("Future is already done!");
} }
} }
private <T> ListenableFuture<T> sendRequest(PendingGitRequest<T> request, Consumer<ToVersionControlServiceMsg.Builder> enrichFunction) { private <T> ListenableFuture<T> sendRequest(PendingGitRequest<T> request, Consumer<ToVersionControlServiceMsg.Builder> enrichFunction) {
registerAndSend(request, builder -> { return sendRequest(request, enrichFunction, null);
}
private <T> ListenableFuture<T> sendRequest(PendingGitRequest<T> request, Consumer<ToVersionControlServiceMsg.Builder> enrichFunction, RepositorySettings settings) {
ListenableFuture<Void> submitFuture = registerAndSend(request, builder -> {
enrichFunction.accept(builder); enrichFunction.accept(builder);
return builder.build(); return builder.build();
}, wrap(request.getFuture())); }, settings);
return request.getFuture(); return Futures.transformAsync(submitFuture, input -> request.getFuture(), executor);
} }
@Override @Override
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public ListenableFuture<List<EntityExportData>> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) { public ListenableFuture<List<EntityExportData>> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) {
log.debug("Executing getEntities [{}][{}][{}]", tenantId, versionId, entityType);
EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType); EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType);
chunkedMsgs.put(request.getRequestId(), new HashMap<>()); chunkedMsgs.put(request.getRequestId(), new HashMap<>());
registerAndSend(request, builder -> builder.setEntitiesContentRequest(EntitiesContentRequestMsg.newBuilder() return sendRequest(request, builder -> builder.setEntitiesContentRequest(
EntitiesContentRequestMsg.newBuilder()
.setVersionId(versionId) .setVersionId(versionId)
.setEntityType(entityType.name()) .setEntityType(entityType.name())
.setOffset(offset) .setOffset(offset)
.setLimit(limit) .setLimit(limit)
).build() ).build());
, wrap(request.getFuture()));
return request.getFuture();
} }
@Override @Override
public ListenableFuture<Void> initRepository(TenantId tenantId, RepositorySettings settings) { public ListenableFuture<Void> initRepository(TenantId tenantId, RepositorySettings settings) {
log.debug("Executing initRepository [{}]", tenantId);
VoidGitRequest request = new VoidGitRequest(tenantId); VoidGitRequest request = new VoidGitRequest(tenantId);
return sendRequest(request, builder -> builder.setInitRepositoryRequest(GenericRepositoryRequestMsg.getDefaultInstance()), settings);
registerAndSend(request, builder -> builder.setInitRepositoryRequest(GenericRepositoryRequestMsg.newBuilder().build()).build()
, settings, wrap(request.getFuture()));
return request.getFuture();
} }
@Override @Override
public ListenableFuture<Void> testRepository(TenantId tenantId, RepositorySettings settings) { public ListenableFuture<Void> testRepository(TenantId tenantId, RepositorySettings settings) {
log.debug("Executing testRepository [{}]", tenantId);
VoidGitRequest request = new VoidGitRequest(tenantId); VoidGitRequest request = new VoidGitRequest(tenantId);
return sendRequest(request, builder -> builder.setTestRepositoryRequest(GenericRepositoryRequestMsg.getDefaultInstance()), settings);
registerAndSend(request, builder -> builder
.setTestRepositoryRequest(GenericRepositoryRequestMsg.newBuilder().build()).build()
, settings, wrap(request.getFuture()));
return request.getFuture();
} }
@Override @Override
public ListenableFuture<Void> clearRepository(TenantId tenantId) { public ListenableFuture<Void> clearRepository(TenantId tenantId) {
log.debug("Executing clearRepository [{}]", tenantId);
ClearRepositoryGitRequest request = new ClearRepositoryGitRequest(tenantId); ClearRepositoryGitRequest request = new ClearRepositoryGitRequest(tenantId);
return sendRequest(request, builder -> builder.setClearRepositoryRequest(GenericRepositoryRequestMsg.getDefaultInstance()));
registerAndSend(request, builder -> builder.setClearRepositoryRequest(GenericRepositoryRequestMsg.newBuilder().build()).build()
, wrap(request.getFuture()));
return request.getFuture();
} }
@Override @Override
@ -518,35 +515,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
return JacksonUtil.fromString(data, EntityExportData.class); return JacksonUtil.fromString(data, EntityExportData.class);
} }
//The future will be completed when the corresponding result arrives from kafka
private static <T> TbQueueCallback wrap(SettableFuture<T> future) {
return new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
};
}
//The future will be completed when the request is successfully sent to kafka
private <T> TbQueueCallback wrap(SettableFuture<T> future, T value) {
return new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
future.set(value);
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
};
}
private static String getRelativePath(EntityType entityType, EntityId entityId) { private static String getRelativePath(EntityType entityType, EntityId entityId) {
String path = entityType.name().toLowerCase(); String path = entityType.name().toLowerCase();
if (entityId != null) { if (entityId != null) {

View File

@ -1379,7 +1379,7 @@ queue:
# Kafka properties for OTA updates topic # Kafka properties for OTA updates topic
ota-updates: "${TB_QUEUE_KAFKA_OTA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}" ota-updates: "${TB_QUEUE_KAFKA_OTA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
# Kafka properties for Version Control topic # Kafka properties for Version Control topic
version-control: "${TB_QUEUE_KAFKA_VC_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}" version-control: "${TB_QUEUE_KAFKA_VC_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
consumer-stats: consumer-stats:
# Prints lag between consumer group offset and last messages offset in Kafka topics # Prints lag between consumer group offset and last messages offset in Kafka topics
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
@ -1672,7 +1672,7 @@ metrics:
# Version control parameters # Version control parameters
vc: vc:
# Pool size for handling export tasks # Pool size for handling export tasks
thread_pool_size: "${TB_VC_POOL_SIZE:2}" thread_pool_size: "${TB_VC_POOL_SIZE:6}"
git: git:
# Pool size for handling the git IO operations # Pool size for handling the git IO operations
io_pool_size: "${TB_VC_GIT_POOL_SIZE:3}" io_pool_size: "${TB_VC_GIT_POOL_SIZE:3}"

View File

@ -126,7 +126,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
private long packProcessingTimeout; private long packProcessingTimeout;
@Value("${vc.git.io_pool_size:3}") @Value("${vc.git.io_pool_size:3}")
private int ioPoolSize; private int ioPoolSize;
@Value("${queue.vc.msg-chunk-size:500000}") @Value("${queue.vc.msg-chunk-size:250000}")
private int msgChunkSize; private int msgChunkSize;
//We need to manually manage the threads since tasks for particular tenant need to be processed sequentially. //We need to manually manage the threads since tasks for particular tenant need to be processed sequentially.
@ -303,6 +303,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
private void handleEntityContentRequest(VersionControlRequestCtx ctx, EntityContentRequestMsg request) throws IOException { private void handleEntityContentRequest(VersionControlRequestCtx ctx, EntityContentRequestMsg request) throws IOException {
String path = getRelativePath(EntityType.valueOf(request.getEntityType()), new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString()); String path = getRelativePath(EntityType.valueOf(request.getEntityType()), new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString());
log.debug("Executing handleEntityContentRequest [{}][{}]", ctx.getTenantId(), path);
String data = vcService.getFileContentAtCommit(ctx.getTenantId(), path, request.getVersionId()); String data = vcService.getFileContentAtCommit(ctx.getTenantId(), path, request.getVersionId());
Iterable<String> dataChunks = StringUtils.split(data, msgChunkSize); Iterable<String> dataChunks = StringUtils.split(data, msgChunkSize);
@ -393,6 +394,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
} }
private void handleCommitRequest(VersionControlRequestCtx ctx, CommitRequestMsg request) throws Exception { private void handleCommitRequest(VersionControlRequestCtx ctx, CommitRequestMsg request) throws Exception {
log.debug("Executing handleCommitRequest [{}][{}]", ctx.getTenantId(), ctx.getRequestId());
var tenantId = ctx.getTenantId(); var tenantId = ctx.getTenantId();
UUID txId = UUID.fromString(request.getTxId()); UUID txId = UUID.fromString(request.getTxId());
if (request.hasPrepareMsg()) { if (request.hasPrepareMsg()) {
@ -443,6 +445,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
} }
private void addToCommit(VersionControlRequestCtx ctx, PendingCommit commit, AddMsg addMsg) throws IOException { private void addToCommit(VersionControlRequestCtx ctx, PendingCommit commit, AddMsg addMsg) throws IOException {
log.debug("Executing addToCommit [{}][{}]", ctx.getTenantId(), ctx.getRequestId());
log.trace("[{}] received chunk {} for 'addToCommit'", addMsg.getChunkedMsgId(), addMsg.getChunkIndex()); log.trace("[{}] received chunk {} for 'addToCommit'", addMsg.getChunkedMsgId(), addMsg.getChunkIndex());
Map<String, String[]> chunkedMsgs = commit.getChunkedMsgs(); Map<String, String[]> chunkedMsgs = commit.getChunkedMsgs();
String[] msgChunks = chunkedMsgs.computeIfAbsent(addMsg.getChunkedMsgId(), id -> new String[addMsg.getChunksCount()]); String[] msgChunks = chunkedMsgs.computeIfAbsent(addMsg.getChunkedMsgId(), id -> new String[addMsg.getChunksCount()]);

View File

@ -136,7 +136,7 @@ queue:
# Kafka properties for Notifications topics # Kafka properties for Notifications topics
notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
# Kafka properties for Core topics # Kafka properties for Core topics
version-control: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" version-control: "${TB_QUEUE_KAFKA_VC_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
consumer-stats: consumer-stats:
# Prints lag between consumer group offset and last messages offset in Kafka topics # Prints lag between consumer group offset and last messages offset in Kafka topics
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
@ -263,7 +263,7 @@ queue:
# Version control parameters # Version control parameters
vc: vc:
# Pool size for handling export tasks # Pool size for handling export tasks
thread_pool_size: "${TB_VC_POOL_SIZE:2}" thread_pool_size: "${TB_VC_POOL_SIZE:6}"
git: git:
# Pool size for handling the git IO operations # Pool size for handling the git IO operations
io_pool_size: "${TB_VC_GIT_POOL_SIZE:3}" io_pool_size: "${TB_VC_GIT_POOL_SIZE:3}"