From d6b516e708d449b0c82b381502f8a6bc087d2e9c Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Wed, 22 Jun 2022 18:06:50 +0300 Subject: [PATCH] VC queue message chunking --- .../impl/BaseEntityImportService.java | 1 + .../DefaultEntitiesVersionControlService.java | 18 +-- .../DefaultGitVersionControlQueueService.java | 117 ++++++++++++++---- .../src/main/resources/thingsboard.yml | 1 + common/cluster-api/src/main/proto/queue.proto | 16 ++- .../server/common/data/StringUtils.java | 6 + .../common/data/sync/vc/EntityDataDiff.java | 1 - .../data/sync/vc/VersionedEntityInfo.java | 1 - .../common/util/CollectionsUtil.java | 8 ++ .../DefaultClusterVersionControlService.java | 60 +++++++-- .../server/service/sync/vc/PendingCommit.java | 13 +- .../src/main/resources/tb-vc-executor.yml | 1 + ui-ngx/src/app/shared/models/vc.models.ts | 1 - 13 files changed, 195 insertions(+), 49 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java index 26c5a869f9..285401e3bd 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java @@ -160,6 +160,7 @@ public abstract class BaseEntityImportService importResult; try { importResult = exportImportService.importEntity(ctx, entityData); @@ -391,6 +392,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont ctx.getImportedEntities().computeIfAbsent(entityType, t -> new HashSet<>()) .add(importResult.getSavedEntity().getId()); } + log.debug("Imported {} pack for tenant {}", entityType, ctx.getTenantId()); offset += limit; } while (entityDataList.size() == limit); } @@ -456,18 +458,20 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont EntityId externalId = ((ExportableEntity) entity).getExternalId(); if (externalId == null) externalId = entityId; - return transformAsync(gitServiceQueue.getEntity(user.getTenantId(), versionId, externalId), + return transform(gitServiceQueue.getEntity(user.getTenantId(), versionId, externalId), otherVersion -> { SimpleEntitiesExportCtx ctx = new SimpleEntitiesExportCtx(user, null, null, EntityExportSettings.builder() .exportRelations(otherVersion.hasRelations()) .exportAttributes(otherVersion.hasAttributes()) .exportCredentials(otherVersion.hasCredentials()) .build()); - EntityExportData currentVersion = exportImportService.exportEntity(ctx, entityId); - return transform(gitServiceQueue.getContentsDiff(user.getTenantId(), - JacksonUtil.toPrettyString(currentVersion.sort()), - JacksonUtil.toPrettyString(otherVersion.sort())), - rawDiff -> new EntityDataDiff(currentVersion, otherVersion, rawDiff), MoreExecutors.directExecutor()); + EntityExportData currentVersion; + try { + currentVersion = exportImportService.exportEntity(ctx, entityId); + } catch (ThingsboardException e) { + throw new RuntimeException(e); + } + return new EntityDataDiff(currentVersion.sort(), otherVersion.sort()); }, MoreExecutors.directExecutor()); } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java index 99246d282f..a1c79a2aac 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java @@ -15,7 +15,10 @@ */ package org.thingsboard.server.service.sync.vc; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; import lombok.SneakyThrows; @@ -23,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.CollectionsUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.EntityType; @@ -70,12 +74,15 @@ import org.thingsboard.server.service.sync.vc.data.VersionsDiffGitRequest; import org.thingsboard.server.service.sync.vc.data.VoidGitRequest; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -92,9 +99,12 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu private final SchedulerComponent scheduler; private final Map> pendingRequestMap = new HashMap<>(); + private final Map> chunkedMsgs = new ConcurrentHashMap<>(); @Value("${queue.vc.request-timeout:60000}") private int requestTimeout; + @Value("${queue.vc.msg-chunk-size:500000}") + private int msgChunkSize; public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService, DataDecodingEncodingService encodingService, @@ -114,24 +124,39 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu CommitGitRequest commit = new CommitGitRequest(user.getTenantId(), request); registerAndSend(commit, builder -> builder.setCommitRequest( buildCommitRequest(commit).setPrepareMsg(getCommitPrepareMsg(user, request)).build() - ).build(), wrap(future, commit)); + ).build(), wrap(future, commit, commit.getRequestId())); return future; } + @SuppressWarnings("UnstableApiUsage") @Override public ListenableFuture addToCommit(CommitGitRequest commit, EntityExportData> entityData) { - SettableFuture future = SettableFuture.create(); - String path = getRelativePath(entityData.getEntityType(), entityData.getExternalId()); String entityDataJson = JacksonUtil.toPrettyString(entityData.sort()); - registerAndSend(commit, builder -> builder.setCommitRequest( - buildCommitRequest(commit).setAddMsg( - TransportProtos.AddMsg.newBuilder() - .setRelativePath(path).setEntityDataJson(entityDataJson).build() - ).build() - ).build(), wrap(future, null)); - return future; + Iterable entityDataChunks = StringUtils.split(entityDataJson, msgChunkSize); + String chunkedMsgId = UUID.randomUUID().toString(); + int chunksCount = Iterables.size(entityDataChunks); + + AtomicInteger chunkIndex = new AtomicInteger(); + List> futures = new ArrayList<>(); + entityDataChunks.forEach(chunk -> { + SettableFuture chunkFuture = SettableFuture.create(); + log.trace("[{}] sending chunk {} for 'addToCommit'", chunkedMsgId, chunkIndex.get()); + registerAndSend(commit, builder -> builder.setCommitRequest( + buildCommitRequest(commit).setAddMsg( + TransportProtos.AddMsg.newBuilder() + .setRelativePath(path).setEntityDataJsonChunk(chunk) + .setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement()) + .setChunksCount(chunksCount).build() + ).build() + ).build(), wrap(chunkFuture, null, commit.getRequestId())); + futures.add(chunkFuture); + }); + return Futures.transform(Futures.allAsList(futures), r -> { + log.trace("[{}] sent all chunks for 'addToCommit'", chunkedMsgId); + return null; + }, MoreExecutors.directExecutor()); } @Override @@ -144,7 +169,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu buildCommitRequest(commit).setDeleteMsg( TransportProtos.DeleteMsg.newBuilder().setRelativePath(path).build() ).build() - ).build(), wrap(future, null)); + ).build(), wrap(future, null, commit.getRequestId())); return future; } @@ -220,7 +245,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu @Override public ListenableFuture> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, EntityType entityType) { return listEntitiesAtVersion(tenantId, ListEntitiesRequestMsg.newBuilder() - .setBranchName(branch) .setVersionId(versionId) .setEntityType(entityType.name()) .build()); @@ -229,7 +253,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu @Override public ListenableFuture> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId) { return listEntitiesAtVersion(tenantId, ListEntitiesRequestMsg.newBuilder() - .setBranchName(branch) .setVersionId(versionId) .build()); } @@ -289,9 +312,11 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu var requestBody = enrichFunction.apply(newRequestProto(request, settings)); log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody); clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback); - request.setTimeoutTask(scheduler.schedule(() -> { - processTimeout(request.getRequestId()); - }, requestTimeout, TimeUnit.MILLISECONDS)); + if (request.getTimeoutTask() == null) { + request.setTimeoutTask(scheduler.schedule(() -> { + processTimeout(request.getRequestId()); + }, requestTimeout, TimeUnit.MILLISECONDS)); + } } else { throw new RuntimeException("Future is already done!"); } @@ -355,15 +380,15 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu @Override public void processResponse(VersionControlResponseMsg vcResponseMsg) { UUID requestId = new UUID(vcResponseMsg.getRequestIdMSB(), vcResponseMsg.getRequestIdLSB()); - PendingGitRequest request = pendingRequestMap.remove(requestId); + PendingGitRequest request = pendingRequestMap.get(requestId); if (request == null) { log.debug("[{}] received stale response: {}", requestId, vcResponseMsg); return; } else { log.debug("[{}] processing response: {}", requestId, vcResponseMsg); - request.getTimeoutTask().cancel(true); } var future = request.getFuture(); + boolean completed = true; if (!StringUtils.isEmpty(vcResponseMsg.getError())) { future.setException(new RuntimeException(vcResponseMsg.getError())); } else { @@ -391,12 +416,39 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu var listVersionsResponse = vcResponseMsg.getListVersionsResponse(); ((ListVersionsGitRequest) request).getFuture().set(toPageData(listVersionsResponse)); } else if (vcResponseMsg.hasEntityContentResponse()) { - var data = vcResponseMsg.getEntityContentResponse().getData(); - ((EntityContentGitRequest) request).getFuture().set(toData(data)); + TransportProtos.EntityContentResponseMsg responseMsg = vcResponseMsg.getEntityContentResponse(); + String[] msgChunks = chunkedMsgs.computeIfAbsent(requestId, id -> new HashMap<>()) + .computeIfAbsent(responseMsg.getChunkedMsgId(), id -> new String[responseMsg.getChunksCount()]); + msgChunks[responseMsg.getChunkIndex()] = responseMsg.getData(); + log.trace("[{}] received chunk {} for 'getEntity'", responseMsg.getChunkedMsgId(), responseMsg.getChunkIndex()); + if (CollectionsUtil.countNonNull(msgChunks) == responseMsg.getChunksCount()) { + log.trace("[{}] collected all chunks for 'getEntity'", responseMsg.getChunkedMsgId()); + String data = String.join("", msgChunks); + ((EntityContentGitRequest) request).getFuture().set(toData(data)); + } else { + completed = false; + } } else if (vcResponseMsg.hasEntitiesContentResponse()) { - var dataList = vcResponseMsg.getEntitiesContentResponse().getDataList(); - ((EntitiesContentGitRequest) request).getFuture() - .set(dataList.stream().map(this::toData).collect(Collectors.toList())); + TransportProtos.EntitiesContentResponseMsg responseMsg = vcResponseMsg.getEntitiesContentResponse(); + TransportProtos.EntityContentResponseMsg item = responseMsg.getItem(); + if (responseMsg.getItemsCount() > 0) { + Map chunkedItems = chunkedMsgs.computeIfAbsent(requestId, id -> new HashMap<>()); + String[] itemChunks = chunkedItems.computeIfAbsent(item.getChunkedMsgId(), id -> { + return new String[item.getChunksCount()]; + }); + itemChunks[item.getChunkIndex()] = item.getData(); + if (chunkedItems.size() == responseMsg.getItemsCount() && chunkedItems.values().stream() + .allMatch(chunks -> CollectionsUtil.countNonNull(chunks) == chunks.length)) { + ((EntitiesContentGitRequest) request).getFuture().set(chunkedItems.values().stream() + .map(chunks -> String.join("", chunks)) + .map(this::toData) + .collect(Collectors.toList())); + } else { + completed = false; + } + } else { + ((EntitiesContentGitRequest) request).getFuture().set(Collections.emptyList()); + } } else if (vcResponseMsg.hasVersionsDiffResponse()) { TransportProtos.VersionsDiffResponseMsg diffResponse = vcResponseMsg.getVersionsDiffResponse(); List entityVersionsDiffList = diffResponse.getDiffList().stream() @@ -416,16 +468,29 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu ((ContentsDiffGitRequest) request).getFuture().set(diff); } } + if (completed) { + removePendingRequest(requestId); + } } private void processTimeout(UUID requestId) { - PendingGitRequest pendingRequest = pendingRequestMap.remove(requestId); + PendingGitRequest pendingRequest = removePendingRequest(requestId); if (pendingRequest != null) { log.debug("[{}] request timed out ({} ms}", requestId, requestTimeout); pendingRequest.getFuture().setException(new TimeoutException("Request timed out")); } } + private PendingGitRequest removePendingRequest(UUID requestId) { + PendingGitRequest pendingRequest = pendingRequestMap.remove(requestId); + if (pendingRequest != null && pendingRequest.getTimeoutTask() != null) { + pendingRequest.getTimeoutTask().cancel(true); + pendingRequest.setTimeoutTask(null); + } + chunkedMsgs.remove(requestId); + return pendingRequest; + } + private PageData toPageData(TransportProtos.ListVersionsResponseMsg listVersionsResponse) { var listVersions = listVersionsResponse.getVersionsList().stream().map(this::getEntityVersion).collect(Collectors.toList()); return new PageData<>(listVersions, listVersionsResponse.getTotalPages(), listVersionsResponse.getTotalElements(), listVersionsResponse.getHasNext()); @@ -458,16 +523,18 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu }; } - private static TbQueueCallback wrap(SettableFuture future, T value) { + private TbQueueCallback wrap(SettableFuture future, T value, UUID requestId) { return new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { future.set(value); + removePendingRequest(requestId); } @Override public void onFailure(Throwable t) { future.setException(t); + removePendingRequest(requestId); } }; } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 07233bb241..645daa58e7 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1024,6 +1024,7 @@ queue: poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}" pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}" request-timeout: "${TB_QUEUE_VC_REQUEST_TIMEOUT:60000}" + msg-chunk-size: "${TB_QUEUE_VC_MSG_CHUNK_SIZE:500000}" js: # JS Eval request topic request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}" diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 3079978bce..6ab364012d 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -707,7 +707,10 @@ message PrepareMsg { message AddMsg { string relativePath = 1; - string entityDataJson = 2; + string entityDataJsonChunk = 2; + string chunkedMsgId = 3; + int32 chunkIndex = 4; + int32 chunksCount = 5; } message DeleteMsg { @@ -747,9 +750,8 @@ message ListVersionsResponseMsg { } message ListEntitiesRequestMsg { - string branchName = 1; - string versionId = 2; - string entityType = 3; + string versionId = 1; + string entityType = 2; } message VersionedEntityInfoProto { @@ -778,6 +780,9 @@ message EntityContentRequestMsg { message EntityContentResponseMsg { string data = 1; + string chunkedMsgId = 2; + int32 chunkIndex = 3; + int32 chunksCount = 4; } message EntitiesContentRequestMsg { @@ -788,7 +793,8 @@ message EntitiesContentRequestMsg { } message EntitiesContentResponseMsg { - repeated string data = 1; + EntityContentResponseMsg item = 1; + int32 itemsCount = 2; } message VersionsDiffRequestMsg { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/StringUtils.java b/common/data/src/main/java/org/thingsboard/server/common/data/StringUtils.java index ff3d3c0fd4..f76757f522 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/StringUtils.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/StringUtils.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.common.data; +import com.google.common.base.Splitter; + import static org.apache.commons.lang3.StringUtils.repeat; public class StringUtils { @@ -92,4 +94,8 @@ public class StringUtils { return input.substring(0, startIndexInclusive) + obfuscatedPart + input.substring(endIndexExclusive); } + public static Iterable split(String value, int maxPartSize) { + return Splitter.fixedLength(maxPartSize).split(value); + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityDataDiff.java b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityDataDiff.java index 800583b0b8..dae2b99106 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityDataDiff.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityDataDiff.java @@ -24,5 +24,4 @@ import org.thingsboard.server.common.data.sync.ie.EntityExportData; public class EntityDataDiff { private EntityExportData currentVersion; private EntityExportData otherVersion; - private String rawDiff; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/VersionedEntityInfo.java b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/VersionedEntityInfo.java index fd278cde1f..ce8c91f4ae 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/VersionedEntityInfo.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/VersionedEntityInfo.java @@ -23,7 +23,6 @@ import org.thingsboard.server.common.data.id.EntityId; @NoArgsConstructor public class VersionedEntityInfo { private EntityId externalId; - // etc.. public VersionedEntityInfo(EntityId externalId) { this.externalId = externalId; diff --git a/common/util/src/main/java/org/thingsboard/common/util/CollectionsUtil.java b/common/util/src/main/java/org/thingsboard/common/util/CollectionsUtil.java index 7e39f9a273..adcffb8d21 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/CollectionsUtil.java +++ b/common/util/src/main/java/org/thingsboard/common/util/CollectionsUtil.java @@ -39,4 +39,12 @@ public class CollectionsUtil { return isNotEmpty(collection) && collection.contains(element); } + public static int countNonNull(T[] array) { + int count = 0; + for (T t : array) { + if (t != null) count++; + } + return count; + } + } diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java index b28618372e..20188d8145 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.sync.vc; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -28,6 +29,7 @@ import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.CollectionsUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; @@ -88,6 +90,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; @@ -122,6 +125,8 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe private long packProcessingTimeout; @Value("${vc.git.io_pool_size:3}") private int ioPoolSize; + @Value("${queue.vc.msg-chunk-size:500000}") + private int msgChunkSize; //We need to manually manage the threads since tasks for particular tenant need to be processed sequentially. private final List ioThreads = new ArrayList<>(); @@ -269,19 +274,49 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe String path = getRelativePath(entityType, null); var ids = vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), path) .stream().skip(request.getOffset()).limit(request.getLimit()).collect(Collectors.toList()); - var response = EntitiesContentResponseMsg.newBuilder(); - for (VersionedEntityInfo info : ids) { - var data = vcService.getFileContentAtCommit(ctx.getTenantId(), - getRelativePath(info.getExternalId().getEntityType(), info.getExternalId().getId().toString()), request.getVersionId()); - response.addData(data); + if (!ids.isEmpty()) { + for (VersionedEntityInfo info : ids) { + var data = vcService.getFileContentAtCommit(ctx.getTenantId(), + getRelativePath(info.getExternalId().getEntityType(), info.getExternalId().getId().toString()), request.getVersionId()); + + Iterable dataChunks = StringUtils.split(data, msgChunkSize); + String chunkedMsgId = UUID.randomUUID().toString(); + int chunksCount = Iterables.size(dataChunks); + AtomicInteger chunkIndex = new AtomicInteger(); + dataChunks.forEach(chunk -> { + EntitiesContentResponseMsg.Builder response = EntitiesContentResponseMsg.newBuilder() + .setItemsCount(ids.size()) + .setItem(EntityContentResponseMsg.newBuilder() + .setData(chunk) + .setChunkedMsgId(chunkedMsgId) + .setChunksCount(chunksCount) + .setChunkIndex(chunkIndex.getAndIncrement()) + .build()); + reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse(response)); + }); + } + } else { + reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse( + EntitiesContentResponseMsg.newBuilder() + .setItemsCount(0))); } - reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse(response)); } private void handleEntityContentRequest(VersionControlRequestCtx ctx, EntityContentRequestMsg request) throws IOException { String path = getRelativePath(EntityType.valueOf(request.getEntityType()), new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString()); String data = vcService.getFileContentAtCommit(ctx.getTenantId(), path, request.getVersionId()); - reply(ctx, Optional.empty(), builder -> builder.setEntityContentResponse(EntityContentResponseMsg.newBuilder().setData(data))); + + Iterable dataChunks = StringUtils.split(data, msgChunkSize); + String chunkedMsgId = UUID.randomUUID().toString(); + int chunksCount = Iterables.size(dataChunks); + + AtomicInteger chunkIndex = new AtomicInteger(); + dataChunks.forEach(chunk -> { + log.trace("[{}] sending chunk {} for 'getEntity'", chunkedMsgId, chunkIndex.get()); + reply(ctx, Optional.empty(), builder -> builder.setEntityContentResponse(EntityContentResponseMsg.newBuilder() + .setData(chunk).setChunkedMsgId(chunkedMsgId).setChunksCount(chunksCount) + .setChunkIndex(chunkIndex.getAndIncrement()))); + }); } private void handleListVersions(VersionControlRequestCtx ctx, ListVersionsRequestMsg request) throws Exception { @@ -412,7 +447,16 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe } private void addToCommit(VersionControlRequestCtx ctx, PendingCommit commit, AddMsg addMsg) throws IOException { - vcService.add(commit, addMsg.getRelativePath(), addMsg.getEntityDataJson()); + log.trace("[{}] received chunk {} for 'addToCommit'", addMsg.getChunkedMsgId(), addMsg.getChunkIndex()); + Map chunkedMsgs = commit.getChunkedMsgs(); + String[] msgChunks = chunkedMsgs.computeIfAbsent(addMsg.getChunkedMsgId(), id -> new String[addMsg.getChunksCount()]); + msgChunks[addMsg.getChunkIndex()] = addMsg.getEntityDataJsonChunk(); + if (CollectionsUtil.countNonNull(msgChunks) == msgChunks.length) { + log.trace("[{}] collected all chunks for 'addToCommit'", addMsg.getChunkedMsgId()); + String entityDataJson = String.join("", msgChunks); + chunkedMsgs.remove(addMsg.getChunkedMsgId()); + vcService.add(commit, addMsg.getRelativePath(), entityDataJson); + } } private void doAbortCurrentCommit(TenantId tenantId, PendingCommit current) { diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/PendingCommit.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/PendingCommit.java index ccd5fc685e..20b1991c9f 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/PendingCommit.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/PendingCommit.java @@ -18,7 +18,9 @@ package org.thingsboard.server.service.sync.vc; import lombok.Data; import org.thingsboard.server.common.data.id.TenantId; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; @Data public class PendingCommit { @@ -31,9 +33,10 @@ public class PendingCommit { private String versionName; private String authorName; - private String authorEmail; + private Map chunkedMsgs; + public PendingCommit(TenantId tenantId, String nodeId, UUID txId, String branch, String versionName, String authorName, String authorEmail) { this.tenantId = tenantId; this.nodeId = nodeId; @@ -44,4 +47,12 @@ public class PendingCommit { this.authorEmail = authorEmail; this.workingBranch = txId.toString(); } + + public Map getChunkedMsgs() { + if (chunkedMsgs == null) { + chunkedMsgs = new ConcurrentHashMap<>(); + } + return chunkedMsgs; + } + } diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index a97c4cfd37..1344ea194a 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -168,6 +168,7 @@ queue: partitions: "${TB_QUEUE_VC_PARTITIONS:10}" poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}" pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}" + msg-chunk-size: "${TB_QUEUE_VC_MSG_CHUNK_SIZE:500000}" vc: # Pool size for handling export tasks diff --git a/ui-ngx/src/app/shared/models/vc.models.ts b/ui-ngx/src/app/shared/models/vc.models.ts index 98f66f74a9..aa41efba63 100644 --- a/ui-ngx/src/app/shared/models/vc.models.ts +++ b/ui-ngx/src/app/shared/models/vc.models.ts @@ -228,7 +228,6 @@ export interface RuleChainExportData extends EntityExportData { export interface EntityDataDiff { currentVersion: EntityExportData; otherVersion: EntityExportData; - rawDiff: string; } export function entityExportDataToJsonString(data: EntityExportData): string {