Improvements to the chunks processing

This commit is contained in:
Andrii Shvaika 2022-06-27 10:35:50 +03:00
parent a107c1ac44
commit e9c9e24e2f
4 changed files with 35 additions and 59 deletions

View File

@ -79,6 +79,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -125,7 +126,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
CommitGitRequest commit = new CommitGitRequest(user.getTenantId(), request);
registerAndSend(commit, builder -> builder.setCommitRequest(
buildCommitRequest(commit).setPrepareMsg(getCommitPrepareMsg(user, request)).build()
).build(), wrap(future, commit, commit.getRequestId()));
).build(), wrap(future, commit));
return future;
}
@ -151,7 +152,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
.setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement())
.setChunksCount(chunksCount).build()
).build()
).build(), wrap(chunkFuture, null, commit.getRequestId()));
).build(), wrap(chunkFuture, null));
futures.add(chunkFuture);
});
return Futures.transform(Futures.allAsList(futures), r -> {
@ -170,7 +171,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
buildCommitRequest(commit).setDeleteMsg(
TransportProtos.DeleteMsg.newBuilder().setRelativePath(path).build()
).build()
).build(), wrap(future, null, commit.getRequestId()));
).build(), wrap(future, null));
return future;
}
@ -280,18 +281,11 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
.build()));
}
@Override
public ListenableFuture<String> getContentsDiff(TenantId tenantId, String content1, String content2) {
ContentsDiffGitRequest request = new ContentsDiffGitRequest(tenantId, content1, content2);
return sendRequest(request, builder -> builder.setContentsDiffRequest(TransportProtos.ContentsDiffRequestMsg.newBuilder()
.setContent1(content1)
.setContent2(content2)));
}
@Override
@SuppressWarnings("rawtypes")
public ListenableFuture<EntityExportData> getEntity(TenantId tenantId, String versionId, EntityId entityId) {
EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId);
chunkedMsgs.put(request.getRequestId(), new HashMap<>());
registerAndSend(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder()
.setVersionId(versionId)
.setEntityType(entityId.getEntityType().name())
@ -314,9 +308,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody);
clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback);
if (request.getTimeoutTask() == null) {
request.setTimeoutTask(scheduler.schedule(() -> {
processTimeout(request.getRequestId());
}, requestTimeout, TimeUnit.MILLISECONDS));
request.setTimeoutTask(scheduler.schedule(() -> processTimeout(request.getRequestId()), requestTimeout, TimeUnit.MILLISECONDS));
}
} else {
throw new RuntimeException("Future is already done!");
@ -335,7 +327,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@SuppressWarnings("rawtypes")
public ListenableFuture<List<EntityExportData>> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) {
EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType);
chunkedMsgs.put(request.getRequestId(), new HashMap<>());
registerAndSend(request, builder -> builder.setEntitiesContentRequest(EntitiesContentRequestMsg.newBuilder()
.setVersionId(versionId)
.setEntityType(entityType.name())
@ -418,14 +410,11 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
((ListVersionsGitRequest) request).getFuture().set(toPageData(listVersionsResponse));
} else if (vcResponseMsg.hasEntityContentResponse()) {
TransportProtos.EntityContentResponseMsg responseMsg = vcResponseMsg.getEntityContentResponse();
String[] msgChunks = chunkedMsgs.computeIfAbsent(requestId, id -> new HashMap<>())
.computeIfAbsent(responseMsg.getChunkedMsgId(), id -> new String[responseMsg.getChunksCount()]);
msgChunks[responseMsg.getChunkIndex()] = responseMsg.getData();
log.trace("[{}] received chunk {} for 'getEntity'", responseMsg.getChunkedMsgId(), responseMsg.getChunkIndex());
if (CollectionsUtil.countNonNull(msgChunks) == responseMsg.getChunksCount()) {
var joined = joinChunks(requestId, responseMsg, 1);
if (joined.isPresent()) {
log.trace("[{}] collected all chunks for 'getEntity'", responseMsg.getChunkedMsgId());
String data = String.join("", msgChunks);
((EntityContentGitRequest) request).getFuture().set(toData(data));
((EntityContentGitRequest) request).getFuture().set(joined.get().get(0));
} else {
completed = false;
}
@ -433,17 +422,9 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
TransportProtos.EntitiesContentResponseMsg responseMsg = vcResponseMsg.getEntitiesContentResponse();
TransportProtos.EntityContentResponseMsg item = responseMsg.getItem();
if (responseMsg.getItemsCount() > 0) {
Map<String, String[]> chunkedItems = chunkedMsgs.computeIfAbsent(requestId, id -> new HashMap<>());
String[] itemChunks = chunkedItems.computeIfAbsent(item.getChunkedMsgId(), id -> {
return new String[item.getChunksCount()];
});
itemChunks[item.getChunkIndex()] = item.getData();
if (chunkedItems.size() == responseMsg.getItemsCount() && chunkedItems.values().stream()
.allMatch(chunks -> CollectionsUtil.countNonNull(chunks) == chunks.length)) {
((EntitiesContentGitRequest) request).getFuture().set(chunkedItems.values().stream()
.map(chunks -> String.join("", chunks))
.map(this::toData)
.collect(Collectors.toList()));
var joined = joinChunks(requestId, item, responseMsg.getItemsCount());
if (joined.isPresent()) {
((EntitiesContentGitRequest) request).getFuture().set(joined.get());
} else {
completed = false;
}
@ -464,9 +445,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
.build())
.collect(Collectors.toList());
((VersionsDiffGitRequest) request).getFuture().set(entityVersionsDiffList);
} else if (vcResponseMsg.hasContentsDiffResponse()) {
String diff = vcResponseMsg.getContentsDiffResponse().getDiff();
((ContentsDiffGitRequest) request).getFuture().set(diff);
}
}
if (completed) {
@ -474,6 +452,25 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
}
}
@SuppressWarnings("rawtypes")
private Optional<List<EntityExportData>> joinChunks(UUID requestId, TransportProtos.EntityContentResponseMsg responseMsg, int expectedMsgCount) {
var chunksMap = chunkedMsgs.get(requestId);
if (chunksMap == null) {
return Optional.empty();
}
String[] msgChunks = chunksMap.computeIfAbsent(responseMsg.getChunkedMsgId(), id -> new String[responseMsg.getChunksCount()]);
msgChunks[responseMsg.getChunkIndex()] = responseMsg.getData();
if (chunksMap.size() == expectedMsgCount && chunksMap.values().stream()
.allMatch(chunks -> CollectionsUtil.countNonNull(chunks) == chunks.length)) {
return Optional.of(chunksMap.values().stream()
.map(chunks -> String.join("", chunks))
.map(this::toData)
.collect(Collectors.toList()));
} else {
return Optional.empty();
}
}
private void processTimeout(UUID requestId) {
PendingGitRequest<?> pendingRequest = removePendingRequest(requestId);
if (pendingRequest != null) {
@ -515,6 +512,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
return JacksonUtil.fromString(data, EntityExportData.class);
}
//The future will be completed when the corresponding result arrives from kafka
private static <T> TbQueueCallback wrap(SettableFuture<T> future) {
return new TbQueueCallback() {
@Override
@ -528,18 +526,17 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
};
}
private <T> TbQueueCallback wrap(SettableFuture<T> future, T value, UUID requestId) {
//The future will be completed when the request is successfully sent to kafka
private <T> TbQueueCallback wrap(SettableFuture<T> future, T value) {
return new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
future.set(value);
removePendingRequest(requestId);
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
removePendingRequest(requestId);
}
};
}

View File

@ -64,8 +64,6 @@ public interface GitVersionControlQueueService {
ListenableFuture<List<EntityVersionsDiff>> getVersionsDiff(TenantId tenantId, EntityType entityType, EntityId externalId, String versionId1, String versionId2);
ListenableFuture<String> getContentsDiff(TenantId tenantId, String rawEntityData1, String rawEntityData2);
ListenableFuture<Void> initRepository(TenantId tenantId, RepositorySettings settings);
ListenableFuture<Void> testRepository(TenantId tenantId, RepositorySettings settings);

View File

@ -829,15 +829,6 @@ message EntityVersionsDiff {
string rawDiff = 6;
}
message ContentsDiffRequestMsg {
string content1 = 1;
string content2 = 2;
}
message ContentsDiffResponseMsg {
string diff = 1;
}
message GenericRepositoryRequestMsg {}
message GenericRepositoryResponseMsg {}
@ -859,7 +850,6 @@ message ToVersionControlServiceMsg {
EntityContentRequestMsg entityContentRequest = 14;
EntitiesContentRequestMsg entitiesContentRequest = 15;
VersionsDiffRequestMsg versionsDiffRequest = 16;
ContentsDiffRequestMsg contentsDiffRequest = 17;
}
message VersionControlResponseMsg {
@ -874,7 +864,6 @@ message VersionControlResponseMsg {
EntityContentResponseMsg entityContentResponse = 9;
EntitiesContentResponseMsg entitiesContentResponse = 10;
VersionsDiffResponseMsg versionsDiffResponse = 11;
ContentsDiffResponseMsg contentsDiffResponse = 12;
}
/**

View File

@ -257,8 +257,6 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
handleEntitiesContentRequest(ctx, msg.getEntitiesContentRequest());
} else if (msg.hasVersionsDiffRequest()) {
handleVersionsDiffRequest(ctx, msg.getVersionsDiffRequest());
} else if (msg.hasContentsDiffRequest()) {
handleContentsDiffRequest(ctx, msg.getContentsDiffRequest());
}
}
}
@ -394,12 +392,6 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
.addAllDiff(diffList)));
}
private void handleContentsDiffRequest(VersionControlRequestCtx ctx, TransportProtos.ContentsDiffRequestMsg request) throws IOException {
String diff = vcService.getContentsDiff(ctx.getTenantId(), request.getContent1(), request.getContent2());
reply(ctx, builder -> builder.setContentsDiffResponse(TransportProtos.ContentsDiffResponseMsg.newBuilder()
.setDiff(diff)));
}
private void handleCommitRequest(VersionControlRequestCtx ctx, CommitRequestMsg request) throws Exception {
var tenantId = ctx.getTenantId();
UUID txId = UUID.fromString(request.getTxId());