VC queue message chunking

This commit is contained in:
Viacheslav Klimov 2022-06-22 18:06:50 +03:00
parent 0c7dd1ec23
commit d6b516e708
13 changed files with 195 additions and 49 deletions

View File

@ -160,6 +160,7 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
protected void cleanupForComparison(E e) { protected void cleanupForComparison(E e) {
e.setTenantId(null); e.setTenantId(null);
e.setExternalId(null);
e.setCreatedTime(0); e.setCreatedTime(0);
} }

View File

@ -362,6 +362,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
.build(); .build();
} }
@SneakyThrows
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
private void importEntities(EntitiesImportCtx ctx, EntityType entityType) { private void importEntities(EntitiesImportCtx ctx, EntityType entityType) {
int limit = 100; int limit = 100;
@ -373,9 +374,9 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
log.debug("[{}] Loading {} entities pack ({})", ctx.getTenantId(), entityType, entityDataList.size());
for (EntityExportData entityData : entityDataList) { for (EntityExportData entityData : entityDataList) {
EntityExportData reimportBackup = JacksonUtil.clone(entityData); EntityExportData reimportBackup = JacksonUtil.clone(entityData);
log.debug("[{}] Loading {} entities", ctx.getTenantId(), entityType);
EntityImportResult<?> importResult; EntityImportResult<?> importResult;
try { try {
importResult = exportImportService.importEntity(ctx, entityData); importResult = exportImportService.importEntity(ctx, entityData);
@ -391,6 +392,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
ctx.getImportedEntities().computeIfAbsent(entityType, t -> new HashSet<>()) ctx.getImportedEntities().computeIfAbsent(entityType, t -> new HashSet<>())
.add(importResult.getSavedEntity().getId()); .add(importResult.getSavedEntity().getId());
} }
log.debug("Imported {} pack for tenant {}", entityType, ctx.getTenantId());
offset += limit; offset += limit;
} while (entityDataList.size() == limit); } while (entityDataList.size() == limit);
} }
@ -456,18 +458,20 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
EntityId externalId = ((ExportableEntity<EntityId>) entity).getExternalId(); EntityId externalId = ((ExportableEntity<EntityId>) entity).getExternalId();
if (externalId == null) externalId = entityId; if (externalId == null) externalId = entityId;
return transformAsync(gitServiceQueue.getEntity(user.getTenantId(), versionId, externalId), return transform(gitServiceQueue.getEntity(user.getTenantId(), versionId, externalId),
otherVersion -> { otherVersion -> {
SimpleEntitiesExportCtx ctx = new SimpleEntitiesExportCtx(user, null, null, EntityExportSettings.builder() SimpleEntitiesExportCtx ctx = new SimpleEntitiesExportCtx(user, null, null, EntityExportSettings.builder()
.exportRelations(otherVersion.hasRelations()) .exportRelations(otherVersion.hasRelations())
.exportAttributes(otherVersion.hasAttributes()) .exportAttributes(otherVersion.hasAttributes())
.exportCredentials(otherVersion.hasCredentials()) .exportCredentials(otherVersion.hasCredentials())
.build()); .build());
EntityExportData<?> currentVersion = exportImportService.exportEntity(ctx, entityId); EntityExportData<?> currentVersion;
return transform(gitServiceQueue.getContentsDiff(user.getTenantId(), try {
JacksonUtil.toPrettyString(currentVersion.sort()), currentVersion = exportImportService.exportEntity(ctx, entityId);
JacksonUtil.toPrettyString(otherVersion.sort())), } catch (ThingsboardException e) {
rawDiff -> new EntityDataDiff(currentVersion, otherVersion, rawDiff), MoreExecutors.directExecutor()); throw new RuntimeException(e);
}
return new EntityDataDiff(currentVersion.sort(), otherVersion.sort());
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} }

View File

@ -15,7 +15,10 @@
*/ */
package org.thingsboard.server.service.sync.vc; package org.thingsboard.server.service.sync.vc;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@ -23,6 +26,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.CollectionsUtil;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
@ -70,12 +74,15 @@ import org.thingsboard.server.service.sync.vc.data.VersionsDiffGitRequest;
import org.thingsboard.server.service.sync.vc.data.VoidGitRequest; import org.thingsboard.server.service.sync.vc.data.VoidGitRequest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -92,9 +99,12 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
private final SchedulerComponent scheduler; private final SchedulerComponent scheduler;
private final Map<UUID, PendingGitRequest<?>> pendingRequestMap = new HashMap<>(); private final Map<UUID, PendingGitRequest<?>> pendingRequestMap = new HashMap<>();
private final Map<UUID, Map<String, String[]>> chunkedMsgs = new ConcurrentHashMap<>();
@Value("${queue.vc.request-timeout:60000}") @Value("${queue.vc.request-timeout:60000}")
private int requestTimeout; private int requestTimeout;
@Value("${queue.vc.msg-chunk-size:500000}")
private int msgChunkSize;
public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService, public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService,
DataDecodingEncodingService encodingService, DataDecodingEncodingService encodingService,
@ -114,24 +124,39 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
CommitGitRequest commit = new CommitGitRequest(user.getTenantId(), request); CommitGitRequest commit = new CommitGitRequest(user.getTenantId(), request);
registerAndSend(commit, builder -> builder.setCommitRequest( registerAndSend(commit, builder -> builder.setCommitRequest(
buildCommitRequest(commit).setPrepareMsg(getCommitPrepareMsg(user, request)).build() buildCommitRequest(commit).setPrepareMsg(getCommitPrepareMsg(user, request)).build()
).build(), wrap(future, commit)); ).build(), wrap(future, commit, commit.getRequestId()));
return future; return future;
} }
@SuppressWarnings("UnstableApiUsage")
@Override @Override
public ListenableFuture<Void> addToCommit(CommitGitRequest commit, EntityExportData<ExportableEntity<EntityId>> entityData) { public ListenableFuture<Void> addToCommit(CommitGitRequest commit, EntityExportData<ExportableEntity<EntityId>> entityData) {
SettableFuture<Void> future = SettableFuture.create();
String path = getRelativePath(entityData.getEntityType(), entityData.getExternalId()); String path = getRelativePath(entityData.getEntityType(), entityData.getExternalId());
String entityDataJson = JacksonUtil.toPrettyString(entityData.sort()); String entityDataJson = JacksonUtil.toPrettyString(entityData.sort());
registerAndSend(commit, builder -> builder.setCommitRequest( Iterable<String> entityDataChunks = StringUtils.split(entityDataJson, msgChunkSize);
buildCommitRequest(commit).setAddMsg( String chunkedMsgId = UUID.randomUUID().toString();
TransportProtos.AddMsg.newBuilder() int chunksCount = Iterables.size(entityDataChunks);
.setRelativePath(path).setEntityDataJson(entityDataJson).build()
).build() AtomicInteger chunkIndex = new AtomicInteger();
).build(), wrap(future, null)); List<ListenableFuture<Void>> futures = new ArrayList<>();
return future; entityDataChunks.forEach(chunk -> {
SettableFuture<Void> chunkFuture = SettableFuture.create();
log.trace("[{}] sending chunk {} for 'addToCommit'", chunkedMsgId, chunkIndex.get());
registerAndSend(commit, builder -> builder.setCommitRequest(
buildCommitRequest(commit).setAddMsg(
TransportProtos.AddMsg.newBuilder()
.setRelativePath(path).setEntityDataJsonChunk(chunk)
.setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement())
.setChunksCount(chunksCount).build()
).build()
).build(), wrap(chunkFuture, null, commit.getRequestId()));
futures.add(chunkFuture);
});
return Futures.transform(Futures.allAsList(futures), r -> {
log.trace("[{}] sent all chunks for 'addToCommit'", chunkedMsgId);
return null;
}, MoreExecutors.directExecutor());
} }
@Override @Override
@ -144,7 +169,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
buildCommitRequest(commit).setDeleteMsg( buildCommitRequest(commit).setDeleteMsg(
TransportProtos.DeleteMsg.newBuilder().setRelativePath(path).build() TransportProtos.DeleteMsg.newBuilder().setRelativePath(path).build()
).build() ).build()
).build(), wrap(future, null)); ).build(), wrap(future, null, commit.getRequestId()));
return future; return future;
} }
@ -220,7 +245,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@Override @Override
public ListenableFuture<List<VersionedEntityInfo>> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, EntityType entityType) { public ListenableFuture<List<VersionedEntityInfo>> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, EntityType entityType) {
return listEntitiesAtVersion(tenantId, ListEntitiesRequestMsg.newBuilder() return listEntitiesAtVersion(tenantId, ListEntitiesRequestMsg.newBuilder()
.setBranchName(branch)
.setVersionId(versionId) .setVersionId(versionId)
.setEntityType(entityType.name()) .setEntityType(entityType.name())
.build()); .build());
@ -229,7 +253,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@Override @Override
public ListenableFuture<List<VersionedEntityInfo>> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId) { public ListenableFuture<List<VersionedEntityInfo>> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId) {
return listEntitiesAtVersion(tenantId, ListEntitiesRequestMsg.newBuilder() return listEntitiesAtVersion(tenantId, ListEntitiesRequestMsg.newBuilder()
.setBranchName(branch)
.setVersionId(versionId) .setVersionId(versionId)
.build()); .build());
} }
@ -289,9 +312,11 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
var requestBody = enrichFunction.apply(newRequestProto(request, settings)); var requestBody = enrichFunction.apply(newRequestProto(request, settings));
log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody); log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody);
clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback); clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback);
request.setTimeoutTask(scheduler.schedule(() -> { if (request.getTimeoutTask() == null) {
processTimeout(request.getRequestId()); request.setTimeoutTask(scheduler.schedule(() -> {
}, requestTimeout, TimeUnit.MILLISECONDS)); processTimeout(request.getRequestId());
}, requestTimeout, TimeUnit.MILLISECONDS));
}
} else { } else {
throw new RuntimeException("Future is already done!"); throw new RuntimeException("Future is already done!");
} }
@ -355,15 +380,15 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@Override @Override
public void processResponse(VersionControlResponseMsg vcResponseMsg) { public void processResponse(VersionControlResponseMsg vcResponseMsg) {
UUID requestId = new UUID(vcResponseMsg.getRequestIdMSB(), vcResponseMsg.getRequestIdLSB()); UUID requestId = new UUID(vcResponseMsg.getRequestIdMSB(), vcResponseMsg.getRequestIdLSB());
PendingGitRequest<?> request = pendingRequestMap.remove(requestId); PendingGitRequest<?> request = pendingRequestMap.get(requestId);
if (request == null) { if (request == null) {
log.debug("[{}] received stale response: {}", requestId, vcResponseMsg); log.debug("[{}] received stale response: {}", requestId, vcResponseMsg);
return; return;
} else { } else {
log.debug("[{}] processing response: {}", requestId, vcResponseMsg); log.debug("[{}] processing response: {}", requestId, vcResponseMsg);
request.getTimeoutTask().cancel(true);
} }
var future = request.getFuture(); var future = request.getFuture();
boolean completed = true;
if (!StringUtils.isEmpty(vcResponseMsg.getError())) { if (!StringUtils.isEmpty(vcResponseMsg.getError())) {
future.setException(new RuntimeException(vcResponseMsg.getError())); future.setException(new RuntimeException(vcResponseMsg.getError()));
} else { } else {
@ -391,12 +416,39 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
var listVersionsResponse = vcResponseMsg.getListVersionsResponse(); var listVersionsResponse = vcResponseMsg.getListVersionsResponse();
((ListVersionsGitRequest) request).getFuture().set(toPageData(listVersionsResponse)); ((ListVersionsGitRequest) request).getFuture().set(toPageData(listVersionsResponse));
} else if (vcResponseMsg.hasEntityContentResponse()) { } else if (vcResponseMsg.hasEntityContentResponse()) {
var data = vcResponseMsg.getEntityContentResponse().getData(); TransportProtos.EntityContentResponseMsg responseMsg = vcResponseMsg.getEntityContentResponse();
((EntityContentGitRequest) request).getFuture().set(toData(data)); String[] msgChunks = chunkedMsgs.computeIfAbsent(requestId, id -> new HashMap<>())
.computeIfAbsent(responseMsg.getChunkedMsgId(), id -> new String[responseMsg.getChunksCount()]);
msgChunks[responseMsg.getChunkIndex()] = responseMsg.getData();
log.trace("[{}] received chunk {} for 'getEntity'", responseMsg.getChunkedMsgId(), responseMsg.getChunkIndex());
if (CollectionsUtil.countNonNull(msgChunks) == responseMsg.getChunksCount()) {
log.trace("[{}] collected all chunks for 'getEntity'", responseMsg.getChunkedMsgId());
String data = String.join("", msgChunks);
((EntityContentGitRequest) request).getFuture().set(toData(data));
} else {
completed = false;
}
} else if (vcResponseMsg.hasEntitiesContentResponse()) { } else if (vcResponseMsg.hasEntitiesContentResponse()) {
var dataList = vcResponseMsg.getEntitiesContentResponse().getDataList(); TransportProtos.EntitiesContentResponseMsg responseMsg = vcResponseMsg.getEntitiesContentResponse();
((EntitiesContentGitRequest) request).getFuture() TransportProtos.EntityContentResponseMsg item = responseMsg.getItem();
.set(dataList.stream().map(this::toData).collect(Collectors.toList())); if (responseMsg.getItemsCount() > 0) {
Map<String, String[]> chunkedItems = chunkedMsgs.computeIfAbsent(requestId, id -> new HashMap<>());
String[] itemChunks = chunkedItems.computeIfAbsent(item.getChunkedMsgId(), id -> {
return new String[item.getChunksCount()];
});
itemChunks[item.getChunkIndex()] = item.getData();
if (chunkedItems.size() == responseMsg.getItemsCount() && chunkedItems.values().stream()
.allMatch(chunks -> CollectionsUtil.countNonNull(chunks) == chunks.length)) {
((EntitiesContentGitRequest) request).getFuture().set(chunkedItems.values().stream()
.map(chunks -> String.join("", chunks))
.map(this::toData)
.collect(Collectors.toList()));
} else {
completed = false;
}
} else {
((EntitiesContentGitRequest) request).getFuture().set(Collections.emptyList());
}
} else if (vcResponseMsg.hasVersionsDiffResponse()) { } else if (vcResponseMsg.hasVersionsDiffResponse()) {
TransportProtos.VersionsDiffResponseMsg diffResponse = vcResponseMsg.getVersionsDiffResponse(); TransportProtos.VersionsDiffResponseMsg diffResponse = vcResponseMsg.getVersionsDiffResponse();
List<EntityVersionsDiff> entityVersionsDiffList = diffResponse.getDiffList().stream() List<EntityVersionsDiff> entityVersionsDiffList = diffResponse.getDiffList().stream()
@ -416,16 +468,29 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
((ContentsDiffGitRequest) request).getFuture().set(diff); ((ContentsDiffGitRequest) request).getFuture().set(diff);
} }
} }
if (completed) {
removePendingRequest(requestId);
}
} }
private void processTimeout(UUID requestId) { private void processTimeout(UUID requestId) {
PendingGitRequest<?> pendingRequest = pendingRequestMap.remove(requestId); PendingGitRequest<?> pendingRequest = removePendingRequest(requestId);
if (pendingRequest != null) { if (pendingRequest != null) {
log.debug("[{}] request timed out ({} ms}", requestId, requestTimeout); log.debug("[{}] request timed out ({} ms}", requestId, requestTimeout);
pendingRequest.getFuture().setException(new TimeoutException("Request timed out")); pendingRequest.getFuture().setException(new TimeoutException("Request timed out"));
} }
} }
private PendingGitRequest<?> removePendingRequest(UUID requestId) {
PendingGitRequest<?> pendingRequest = pendingRequestMap.remove(requestId);
if (pendingRequest != null && pendingRequest.getTimeoutTask() != null) {
pendingRequest.getTimeoutTask().cancel(true);
pendingRequest.setTimeoutTask(null);
}
chunkedMsgs.remove(requestId);
return pendingRequest;
}
private PageData<EntityVersion> toPageData(TransportProtos.ListVersionsResponseMsg listVersionsResponse) { private PageData<EntityVersion> toPageData(TransportProtos.ListVersionsResponseMsg listVersionsResponse) {
var listVersions = listVersionsResponse.getVersionsList().stream().map(this::getEntityVersion).collect(Collectors.toList()); var listVersions = listVersionsResponse.getVersionsList().stream().map(this::getEntityVersion).collect(Collectors.toList());
return new PageData<>(listVersions, listVersionsResponse.getTotalPages(), listVersionsResponse.getTotalElements(), listVersionsResponse.getHasNext()); return new PageData<>(listVersions, listVersionsResponse.getTotalPages(), listVersionsResponse.getTotalElements(), listVersionsResponse.getHasNext());
@ -458,16 +523,18 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
}; };
} }
private static <T> TbQueueCallback wrap(SettableFuture<T> future, T value) { private <T> TbQueueCallback wrap(SettableFuture<T> future, T value, UUID requestId) {
return new TbQueueCallback() { return new TbQueueCallback() {
@Override @Override
public void onSuccess(TbQueueMsgMetadata metadata) { public void onSuccess(TbQueueMsgMetadata metadata) {
future.set(value); future.set(value);
removePendingRequest(requestId);
} }
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
future.setException(t); future.setException(t);
removePendingRequest(requestId);
} }
}; };
} }

View File

@ -1024,6 +1024,7 @@ queue:
poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}" poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}"
pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}" pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}"
request-timeout: "${TB_QUEUE_VC_REQUEST_TIMEOUT:60000}" request-timeout: "${TB_QUEUE_VC_REQUEST_TIMEOUT:60000}"
msg-chunk-size: "${TB_QUEUE_VC_MSG_CHUNK_SIZE:500000}"
js: js:
# JS Eval request topic # JS Eval request topic
request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}" request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}"

View File

@ -707,7 +707,10 @@ message PrepareMsg {
message AddMsg { message AddMsg {
string relativePath = 1; string relativePath = 1;
string entityDataJson = 2; string entityDataJsonChunk = 2;
string chunkedMsgId = 3;
int32 chunkIndex = 4;
int32 chunksCount = 5;
} }
message DeleteMsg { message DeleteMsg {
@ -747,9 +750,8 @@ message ListVersionsResponseMsg {
} }
message ListEntitiesRequestMsg { message ListEntitiesRequestMsg {
string branchName = 1; string versionId = 1;
string versionId = 2; string entityType = 2;
string entityType = 3;
} }
message VersionedEntityInfoProto { message VersionedEntityInfoProto {
@ -778,6 +780,9 @@ message EntityContentRequestMsg {
message EntityContentResponseMsg { message EntityContentResponseMsg {
string data = 1; string data = 1;
string chunkedMsgId = 2;
int32 chunkIndex = 3;
int32 chunksCount = 4;
} }
message EntitiesContentRequestMsg { message EntitiesContentRequestMsg {
@ -788,7 +793,8 @@ message EntitiesContentRequestMsg {
} }
message EntitiesContentResponseMsg { message EntitiesContentResponseMsg {
repeated string data = 1; EntityContentResponseMsg item = 1;
int32 itemsCount = 2;
} }
message VersionsDiffRequestMsg { message VersionsDiffRequestMsg {

View File

@ -15,6 +15,8 @@
*/ */
package org.thingsboard.server.common.data; package org.thingsboard.server.common.data;
import com.google.common.base.Splitter;
import static org.apache.commons.lang3.StringUtils.repeat; import static org.apache.commons.lang3.StringUtils.repeat;
public class StringUtils { public class StringUtils {
@ -92,4 +94,8 @@ public class StringUtils {
return input.substring(0, startIndexInclusive) + obfuscatedPart + input.substring(endIndexExclusive); return input.substring(0, startIndexInclusive) + obfuscatedPart + input.substring(endIndexExclusive);
} }
public static Iterable<String> split(String value, int maxPartSize) {
return Splitter.fixedLength(maxPartSize).split(value);
}
} }

View File

@ -24,5 +24,4 @@ import org.thingsboard.server.common.data.sync.ie.EntityExportData;
public class EntityDataDiff { public class EntityDataDiff {
private EntityExportData<?> currentVersion; private EntityExportData<?> currentVersion;
private EntityExportData<?> otherVersion; private EntityExportData<?> otherVersion;
private String rawDiff;
} }

View File

@ -23,7 +23,6 @@ import org.thingsboard.server.common.data.id.EntityId;
@NoArgsConstructor @NoArgsConstructor
public class VersionedEntityInfo { public class VersionedEntityInfo {
private EntityId externalId; private EntityId externalId;
// etc..
public VersionedEntityInfo(EntityId externalId) { public VersionedEntityInfo(EntityId externalId) {
this.externalId = externalId; this.externalId = externalId;

View File

@ -39,4 +39,12 @@ public class CollectionsUtil {
return isNotEmpty(collection) && collection.contains(element); return isNotEmpty(collection) && collection.contains(element);
} }
public static <T> int countNonNull(T[] array) {
int count = 0;
for (T t : array) {
if (t != null) count++;
}
return count;
}
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.service.sync.vc; package org.thingsboard.server.service.sync.vc;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -28,6 +29,7 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.CollectionsUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
@ -88,6 +90,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function; import java.util.function.Function;
@ -122,6 +125,8 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
private long packProcessingTimeout; private long packProcessingTimeout;
@Value("${vc.git.io_pool_size:3}") @Value("${vc.git.io_pool_size:3}")
private int ioPoolSize; private int ioPoolSize;
@Value("${queue.vc.msg-chunk-size:500000}")
private int msgChunkSize;
//We need to manually manage the threads since tasks for particular tenant need to be processed sequentially. //We need to manually manage the threads since tasks for particular tenant need to be processed sequentially.
private final List<ListeningExecutorService> ioThreads = new ArrayList<>(); private final List<ListeningExecutorService> ioThreads = new ArrayList<>();
@ -269,19 +274,49 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
String path = getRelativePath(entityType, null); String path = getRelativePath(entityType, null);
var ids = vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), path) var ids = vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), path)
.stream().skip(request.getOffset()).limit(request.getLimit()).collect(Collectors.toList()); .stream().skip(request.getOffset()).limit(request.getLimit()).collect(Collectors.toList());
var response = EntitiesContentResponseMsg.newBuilder(); if (!ids.isEmpty()) {
for (VersionedEntityInfo info : ids) { for (VersionedEntityInfo info : ids) {
var data = vcService.getFileContentAtCommit(ctx.getTenantId(), var data = vcService.getFileContentAtCommit(ctx.getTenantId(),
getRelativePath(info.getExternalId().getEntityType(), info.getExternalId().getId().toString()), request.getVersionId()); getRelativePath(info.getExternalId().getEntityType(), info.getExternalId().getId().toString()), request.getVersionId());
response.addData(data);
Iterable<String> dataChunks = StringUtils.split(data, msgChunkSize);
String chunkedMsgId = UUID.randomUUID().toString();
int chunksCount = Iterables.size(dataChunks);
AtomicInteger chunkIndex = new AtomicInteger();
dataChunks.forEach(chunk -> {
EntitiesContentResponseMsg.Builder response = EntitiesContentResponseMsg.newBuilder()
.setItemsCount(ids.size())
.setItem(EntityContentResponseMsg.newBuilder()
.setData(chunk)
.setChunkedMsgId(chunkedMsgId)
.setChunksCount(chunksCount)
.setChunkIndex(chunkIndex.getAndIncrement())
.build());
reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse(response));
});
}
} else {
reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse(
EntitiesContentResponseMsg.newBuilder()
.setItemsCount(0)));
} }
reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse(response));
} }
private void handleEntityContentRequest(VersionControlRequestCtx ctx, EntityContentRequestMsg request) throws IOException { private void handleEntityContentRequest(VersionControlRequestCtx ctx, EntityContentRequestMsg request) throws IOException {
String path = getRelativePath(EntityType.valueOf(request.getEntityType()), new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString()); String path = getRelativePath(EntityType.valueOf(request.getEntityType()), new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString());
String data = vcService.getFileContentAtCommit(ctx.getTenantId(), path, request.getVersionId()); String data = vcService.getFileContentAtCommit(ctx.getTenantId(), path, request.getVersionId());
reply(ctx, Optional.empty(), builder -> builder.setEntityContentResponse(EntityContentResponseMsg.newBuilder().setData(data)));
Iterable<String> dataChunks = StringUtils.split(data, msgChunkSize);
String chunkedMsgId = UUID.randomUUID().toString();
int chunksCount = Iterables.size(dataChunks);
AtomicInteger chunkIndex = new AtomicInteger();
dataChunks.forEach(chunk -> {
log.trace("[{}] sending chunk {} for 'getEntity'", chunkedMsgId, chunkIndex.get());
reply(ctx, Optional.empty(), builder -> builder.setEntityContentResponse(EntityContentResponseMsg.newBuilder()
.setData(chunk).setChunkedMsgId(chunkedMsgId).setChunksCount(chunksCount)
.setChunkIndex(chunkIndex.getAndIncrement())));
});
} }
private void handleListVersions(VersionControlRequestCtx ctx, ListVersionsRequestMsg request) throws Exception { private void handleListVersions(VersionControlRequestCtx ctx, ListVersionsRequestMsg request) throws Exception {
@ -412,7 +447,16 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
} }
private void addToCommit(VersionControlRequestCtx ctx, PendingCommit commit, AddMsg addMsg) throws IOException { private void addToCommit(VersionControlRequestCtx ctx, PendingCommit commit, AddMsg addMsg) throws IOException {
vcService.add(commit, addMsg.getRelativePath(), addMsg.getEntityDataJson()); log.trace("[{}] received chunk {} for 'addToCommit'", addMsg.getChunkedMsgId(), addMsg.getChunkIndex());
Map<String, String[]> chunkedMsgs = commit.getChunkedMsgs();
String[] msgChunks = chunkedMsgs.computeIfAbsent(addMsg.getChunkedMsgId(), id -> new String[addMsg.getChunksCount()]);
msgChunks[addMsg.getChunkIndex()] = addMsg.getEntityDataJsonChunk();
if (CollectionsUtil.countNonNull(msgChunks) == msgChunks.length) {
log.trace("[{}] collected all chunks for 'addToCommit'", addMsg.getChunkedMsgId());
String entityDataJson = String.join("", msgChunks);
chunkedMsgs.remove(addMsg.getChunkedMsgId());
vcService.add(commit, addMsg.getRelativePath(), entityDataJson);
}
} }
private void doAbortCurrentCommit(TenantId tenantId, PendingCommit current) { private void doAbortCurrentCommit(TenantId tenantId, PendingCommit current) {

View File

@ -18,7 +18,9 @@ package org.thingsboard.server.service.sync.vc;
import lombok.Data; import lombok.Data;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@Data @Data
public class PendingCommit { public class PendingCommit {
@ -31,9 +33,10 @@ public class PendingCommit {
private String versionName; private String versionName;
private String authorName; private String authorName;
private String authorEmail; private String authorEmail;
private Map<String, String[]> chunkedMsgs;
public PendingCommit(TenantId tenantId, String nodeId, UUID txId, String branch, String versionName, String authorName, String authorEmail) { public PendingCommit(TenantId tenantId, String nodeId, UUID txId, String branch, String versionName, String authorName, String authorEmail) {
this.tenantId = tenantId; this.tenantId = tenantId;
this.nodeId = nodeId; this.nodeId = nodeId;
@ -44,4 +47,12 @@ public class PendingCommit {
this.authorEmail = authorEmail; this.authorEmail = authorEmail;
this.workingBranch = txId.toString(); this.workingBranch = txId.toString();
} }
public Map<String, String[]> getChunkedMsgs() {
if (chunkedMsgs == null) {
chunkedMsgs = new ConcurrentHashMap<>();
}
return chunkedMsgs;
}
} }

View File

@ -168,6 +168,7 @@ queue:
partitions: "${TB_QUEUE_VC_PARTITIONS:10}" partitions: "${TB_QUEUE_VC_PARTITIONS:10}"
poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}" poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}"
pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}" pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}"
msg-chunk-size: "${TB_QUEUE_VC_MSG_CHUNK_SIZE:500000}"
vc: vc:
# Pool size for handling export tasks # Pool size for handling export tasks

View File

@ -228,7 +228,6 @@ export interface RuleChainExportData extends EntityExportData<RuleChain> {
export interface EntityDataDiff { export interface EntityDataDiff {
currentVersion: EntityExportData<any>; currentVersion: EntityExportData<any>;
otherVersion: EntityExportData<any>; otherVersion: EntityExportData<any>;
rawDiff: string;
} }
export function entityExportDataToJsonString(data: EntityExportData<any>): string { export function entityExportDataToJsonString(data: EntityExportData<any>): string {