Keep data order when getting chunked messages

This commit is contained in:
Viacheslav Klimov 2022-07-01 15:31:29 +03:00
parent 4a1fc9a14a
commit a86eb53038
3 changed files with 15 additions and 9 deletions

View File

@ -289,7 +289,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
return cachePut(ctx.getRequestId(), onError(e.getExternalId(), e.getCause())); return cachePut(ctx.getRequestId(), onError(e.getExternalId(), e.getCause()));
} catch (Exception e) { } catch (Exception e) {
log.info("[{}] Failed to process request [{}] due to: ", ctx.getTenantId(), request, e); log.info("[{}] Failed to process request [{}] due to: ", ctx.getTenantId(), request, e);
return cachePut(ctx.getRequestId(), VersionLoadResult.error(EntityLoadError.runtimeError(e.getMessage()))); return cachePut(ctx.getRequestId(), VersionLoadResult.error(EntityLoadError.runtimeError(e)));
} }
} }
@ -433,7 +433,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
} }
private VersionLoadResult onError(EntityId externalId, Throwable e) { private VersionLoadResult onError(EntityId externalId, Throwable e) {
return analyze(e, externalId).orElse(VersionLoadResult.error(EntityLoadError.runtimeError(e.getMessage()))); return analyze(e, externalId).orElse(VersionLoadResult.error(EntityLoadError.runtimeError(e)));
} }
private Optional<VersionLoadResult> analyze(Throwable e, EntityId externalId) { private Optional<VersionLoadResult> analyze(Throwable e, EntityId externalId) {
@ -605,7 +605,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
private void processLoadError(EntitiesImportCtx ctx, Throwable e) { private void processLoadError(EntitiesImportCtx ctx, Throwable e) {
log.debug("[{}] Failed to load the commit: {}", ctx.getRequestId(), ctx.getVersionId(), e); log.debug("[{}] Failed to load the commit: {}", ctx.getRequestId(), ctx.getVersionId(), e);
cachePut(ctx.getRequestId(), VersionLoadResult.error(EntityLoadError.runtimeError(e.getMessage()))); cachePut(ctx.getRequestId(), VersionLoadResult.error(EntityLoadError.runtimeError(e)));
} }
private void cachePut(UUID requestId, VersionCreationResult result) { private void cachePut(UUID requestId, VersionCreationResult result) {

View File

@ -77,6 +77,7 @@ import org.thingsboard.server.service.sync.vc.data.VoidGitRequest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -101,7 +102,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
private final SchedulerComponent scheduler; private final SchedulerComponent scheduler;
private final Map<UUID, PendingGitRequest<?>> pendingRequestMap = new HashMap<>(); private final Map<UUID, PendingGitRequest<?>> pendingRequestMap = new HashMap<>();
private final Map<UUID, Map<String, String[]>> chunkedMsgs = new ConcurrentHashMap<>(); private final Map<UUID, LinkedHashMap<String, String[]>> chunkedMsgs = new ConcurrentHashMap<>();
@Value("${queue.vc.request-timeout:60000}") @Value("${queue.vc.request-timeout:60000}")
private int requestTimeout; private int requestTimeout;
@ -285,7 +286,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public ListenableFuture<EntityExportData> getEntity(TenantId tenantId, String versionId, EntityId entityId) { public ListenableFuture<EntityExportData> getEntity(TenantId tenantId, String versionId, EntityId entityId) {
EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId); EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId);
chunkedMsgs.put(request.getRequestId(), new HashMap<>()); chunkedMsgs.put(request.getRequestId(), new LinkedHashMap<>());
registerAndSend(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder() registerAndSend(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder()
.setVersionId(versionId) .setVersionId(versionId)
.setEntityType(entityId.getEntityType().name()) .setEntityType(entityId.getEntityType().name())
@ -327,7 +328,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public ListenableFuture<List<EntityExportData>> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) { public ListenableFuture<List<EntityExportData>> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) {
EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType); EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType);
chunkedMsgs.put(request.getRequestId(), new HashMap<>()); chunkedMsgs.put(request.getRequestId(), new LinkedHashMap<>());
registerAndSend(request, builder -> builder.setEntitiesContentRequest(EntitiesContentRequestMsg.newBuilder() registerAndSend(request, builder -> builder.setEntitiesContentRequest(EntitiesContentRequestMsg.newBuilder()
.setVersionId(versionId) .setVersionId(versionId)
.setEntityType(entityType.name()) .setEntityType(entityType.name())

View File

@ -18,10 +18,11 @@ package org.thingsboard.server.common.data.sync.vc;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import java.io.Serializable; import java.io.Serializable;
import java.util.List;
@Data @Data
@Builder @Builder
@ -43,8 +44,12 @@ public class EntityLoadError implements Serializable {
return EntityLoadError.builder().type("MISSING_REFERENCED_ENTITY").source(sourceId).target(targetId).build(); return EntityLoadError.builder().type("MISSING_REFERENCED_ENTITY").source(sourceId).target(targetId).build();
} }
public static EntityLoadError runtimeError(String msg) { public static EntityLoadError runtimeError(Throwable e) {
return EntityLoadError.builder().type("RUNTIME").message(msg).build(); String message = e.getMessage();
if (StringUtils.isEmpty(message)) {
message = "unexpected error (" + ClassUtils.getShortClassName(e.getClass()) + ")";
}
return EntityLoadError.builder().type("RUNTIME").message(message).build();
} }
} }