From e9c9e24e2f1f036db73daacb933bf0f788ec7558 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 27 Jun 2022 10:35:50 +0300 Subject: [PATCH] Improvements to the chunks processing --- .../DefaultGitVersionControlQueueService.java | 73 +++++++++---------- .../vc/GitVersionControlQueueService.java | 2 - common/cluster-api/src/main/proto/queue.proto | 11 --- .../DefaultClusterVersionControlService.java | 8 -- 4 files changed, 35 insertions(+), 59 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java index 5a726c871f..6eaa2df74f 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java @@ -79,6 +79,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -125,7 +126,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu CommitGitRequest commit = new CommitGitRequest(user.getTenantId(), request); registerAndSend(commit, builder -> builder.setCommitRequest( buildCommitRequest(commit).setPrepareMsg(getCommitPrepareMsg(user, request)).build() - ).build(), wrap(future, commit, commit.getRequestId())); + ).build(), wrap(future, commit)); return future; } @@ -151,7 +152,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu .setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement()) .setChunksCount(chunksCount).build() ).build() - ).build(), wrap(chunkFuture, null, commit.getRequestId())); + ).build(), wrap(chunkFuture, null)); futures.add(chunkFuture); }); return Futures.transform(Futures.allAsList(futures), r -> { @@ -170,7 +171,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu buildCommitRequest(commit).setDeleteMsg( TransportProtos.DeleteMsg.newBuilder().setRelativePath(path).build() ).build() - ).build(), wrap(future, null, commit.getRequestId())); + ).build(), wrap(future, null)); return future; } @@ -280,18 +281,11 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu .build())); } - @Override - public ListenableFuture getContentsDiff(TenantId tenantId, String content1, String content2) { - ContentsDiffGitRequest request = new ContentsDiffGitRequest(tenantId, content1, content2); - return sendRequest(request, builder -> builder.setContentsDiffRequest(TransportProtos.ContentsDiffRequestMsg.newBuilder() - .setContent1(content1) - .setContent2(content2))); - } - @Override @SuppressWarnings("rawtypes") public ListenableFuture getEntity(TenantId tenantId, String versionId, EntityId entityId) { EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId); + chunkedMsgs.put(request.getRequestId(), new HashMap<>()); registerAndSend(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder() .setVersionId(versionId) .setEntityType(entityId.getEntityType().name()) @@ -314,9 +308,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody); clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback); if (request.getTimeoutTask() == null) { - request.setTimeoutTask(scheduler.schedule(() -> { - processTimeout(request.getRequestId()); - }, requestTimeout, TimeUnit.MILLISECONDS)); + request.setTimeoutTask(scheduler.schedule(() -> processTimeout(request.getRequestId()), requestTimeout, TimeUnit.MILLISECONDS)); } } else { throw new RuntimeException("Future is already done!"); @@ -335,7 +327,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu @SuppressWarnings("rawtypes") public ListenableFuture> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) { EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType); - + chunkedMsgs.put(request.getRequestId(), new HashMap<>()); registerAndSend(request, builder -> builder.setEntitiesContentRequest(EntitiesContentRequestMsg.newBuilder() .setVersionId(versionId) .setEntityType(entityType.name()) @@ -418,14 +410,11 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu ((ListVersionsGitRequest) request).getFuture().set(toPageData(listVersionsResponse)); } else if (vcResponseMsg.hasEntityContentResponse()) { TransportProtos.EntityContentResponseMsg responseMsg = vcResponseMsg.getEntityContentResponse(); - String[] msgChunks = chunkedMsgs.computeIfAbsent(requestId, id -> new HashMap<>()) - .computeIfAbsent(responseMsg.getChunkedMsgId(), id -> new String[responseMsg.getChunksCount()]); - msgChunks[responseMsg.getChunkIndex()] = responseMsg.getData(); log.trace("[{}] received chunk {} for 'getEntity'", responseMsg.getChunkedMsgId(), responseMsg.getChunkIndex()); - if (CollectionsUtil.countNonNull(msgChunks) == responseMsg.getChunksCount()) { + var joined = joinChunks(requestId, responseMsg, 1); + if (joined.isPresent()) { log.trace("[{}] collected all chunks for 'getEntity'", responseMsg.getChunkedMsgId()); - String data = String.join("", msgChunks); - ((EntityContentGitRequest) request).getFuture().set(toData(data)); + ((EntityContentGitRequest) request).getFuture().set(joined.get().get(0)); } else { completed = false; } @@ -433,17 +422,9 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu TransportProtos.EntitiesContentResponseMsg responseMsg = vcResponseMsg.getEntitiesContentResponse(); TransportProtos.EntityContentResponseMsg item = responseMsg.getItem(); if (responseMsg.getItemsCount() > 0) { - Map chunkedItems = chunkedMsgs.computeIfAbsent(requestId, id -> new HashMap<>()); - String[] itemChunks = chunkedItems.computeIfAbsent(item.getChunkedMsgId(), id -> { - return new String[item.getChunksCount()]; - }); - itemChunks[item.getChunkIndex()] = item.getData(); - if (chunkedItems.size() == responseMsg.getItemsCount() && chunkedItems.values().stream() - .allMatch(chunks -> CollectionsUtil.countNonNull(chunks) == chunks.length)) { - ((EntitiesContentGitRequest) request).getFuture().set(chunkedItems.values().stream() - .map(chunks -> String.join("", chunks)) - .map(this::toData) - .collect(Collectors.toList())); + var joined = joinChunks(requestId, item, responseMsg.getItemsCount()); + if (joined.isPresent()) { + ((EntitiesContentGitRequest) request).getFuture().set(joined.get()); } else { completed = false; } @@ -464,9 +445,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu .build()) .collect(Collectors.toList()); ((VersionsDiffGitRequest) request).getFuture().set(entityVersionsDiffList); - } else if (vcResponseMsg.hasContentsDiffResponse()) { - String diff = vcResponseMsg.getContentsDiffResponse().getDiff(); - ((ContentsDiffGitRequest) request).getFuture().set(diff); } } if (completed) { @@ -474,6 +452,25 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu } } + @SuppressWarnings("rawtypes") + private Optional> joinChunks(UUID requestId, TransportProtos.EntityContentResponseMsg responseMsg, int expectedMsgCount) { + var chunksMap = chunkedMsgs.get(requestId); + if (chunksMap == null) { + return Optional.empty(); + } + String[] msgChunks = chunksMap.computeIfAbsent(responseMsg.getChunkedMsgId(), id -> new String[responseMsg.getChunksCount()]); + msgChunks[responseMsg.getChunkIndex()] = responseMsg.getData(); + if (chunksMap.size() == expectedMsgCount && chunksMap.values().stream() + .allMatch(chunks -> CollectionsUtil.countNonNull(chunks) == chunks.length)) { + return Optional.of(chunksMap.values().stream() + .map(chunks -> String.join("", chunks)) + .map(this::toData) + .collect(Collectors.toList())); + } else { + return Optional.empty(); + } + } + private void processTimeout(UUID requestId) { PendingGitRequest pendingRequest = removePendingRequest(requestId); if (pendingRequest != null) { @@ -515,6 +512,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu return JacksonUtil.fromString(data, EntityExportData.class); } + //The future will be completed when the corresponding result arrives from kafka private static TbQueueCallback wrap(SettableFuture future) { return new TbQueueCallback() { @Override @@ -528,18 +526,17 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu }; } - private TbQueueCallback wrap(SettableFuture future, T value, UUID requestId) { + //The future will be completed when the request is successfully sent to kafka + private TbQueueCallback wrap(SettableFuture future, T value) { return new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { future.set(value); - removePendingRequest(requestId); } @Override public void onFailure(Throwable t) { future.setException(t); - removePendingRequest(requestId); } }; } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/GitVersionControlQueueService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/GitVersionControlQueueService.java index a1e64da8f1..cc83479896 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/GitVersionControlQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/GitVersionControlQueueService.java @@ -64,8 +64,6 @@ public interface GitVersionControlQueueService { ListenableFuture> getVersionsDiff(TenantId tenantId, EntityType entityType, EntityId externalId, String versionId1, String versionId2); - ListenableFuture getContentsDiff(TenantId tenantId, String rawEntityData1, String rawEntityData2); - ListenableFuture initRepository(TenantId tenantId, RepositorySettings settings); ListenableFuture testRepository(TenantId tenantId, RepositorySettings settings); diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 855ddb9299..1b6bc43f74 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -829,15 +829,6 @@ message EntityVersionsDiff { string rawDiff = 6; } -message ContentsDiffRequestMsg { - string content1 = 1; - string content2 = 2; -} - -message ContentsDiffResponseMsg { - string diff = 1; -} - message GenericRepositoryRequestMsg {} message GenericRepositoryResponseMsg {} @@ -859,7 +850,6 @@ message ToVersionControlServiceMsg { EntityContentRequestMsg entityContentRequest = 14; EntitiesContentRequestMsg entitiesContentRequest = 15; VersionsDiffRequestMsg versionsDiffRequest = 16; - ContentsDiffRequestMsg contentsDiffRequest = 17; } message VersionControlResponseMsg { @@ -874,7 +864,6 @@ message VersionControlResponseMsg { EntityContentResponseMsg entityContentResponse = 9; EntitiesContentResponseMsg entitiesContentResponse = 10; VersionsDiffResponseMsg versionsDiffResponse = 11; - ContentsDiffResponseMsg contentsDiffResponse = 12; } /** diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java index d8d137ae5b..7d8e81d1c4 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java @@ -257,8 +257,6 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe handleEntitiesContentRequest(ctx, msg.getEntitiesContentRequest()); } else if (msg.hasVersionsDiffRequest()) { handleVersionsDiffRequest(ctx, msg.getVersionsDiffRequest()); - } else if (msg.hasContentsDiffRequest()) { - handleContentsDiffRequest(ctx, msg.getContentsDiffRequest()); } } } @@ -394,12 +392,6 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe .addAllDiff(diffList))); } - private void handleContentsDiffRequest(VersionControlRequestCtx ctx, TransportProtos.ContentsDiffRequestMsg request) throws IOException { - String diff = vcService.getContentsDiff(ctx.getTenantId(), request.getContent1(), request.getContent2()); - reply(ctx, builder -> builder.setContentsDiffResponse(TransportProtos.ContentsDiffResponseMsg.newBuilder() - .setDiff(diff))); - } - private void handleCommitRequest(VersionControlRequestCtx ctx, CommitRequestMsg request) throws Exception { var tenantId = ctx.getTenantId(); UUID txId = UUID.fromString(request.getTxId());