Implementation of the API

This commit is contained in:
Andrii Shvaika 2022-05-24 11:38:03 +03:00
parent db34f7766f
commit 1065fd9fbe
8 changed files with 188 additions and 319 deletions

View File

@ -21,17 +21,16 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.AdminSettings;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ExportableEntity;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
import org.thingsboard.server.common.data.sync.vc.EntitiesVersionControlSettings;
@ -61,6 +60,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
@TbCoreComponent
@Service
@ -209,6 +209,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
}
@Override
@SuppressWarnings("rawtypes")
public ListenableFuture<EntityExportData> getEntity(TenantId tenantId, String versionId, EntityId entityId) {
EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId);
registerAndSend(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder()
@ -218,14 +219,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
.setEntityIdLSB(entityId.getId().getLeastSignificantBits())).build()
, wrap(request.getFuture()));
return request.getFuture();
// try {
// String entityDataJson = gitRepositoryService.getFileContentAtCommit(tenantId,
// getRelativePath(entityId.getEntityType(), entityId.getId().toString()), versionId);
// return JacksonUtil.fromString(entityDataJson, EntityExportData.class);
// } catch (Exception e) {
// //TODO: analyze and return meaningful exceptions that we can show to the client;
// throw new RuntimeException(e);
// }
}
private <T> void registerAndSend(PendingGitRequest<T> request,
@ -237,13 +230,16 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
Function<ToVersionControlServiceMsg.Builder, ToVersionControlServiceMsg> enrichFunction, EntitiesVersionControlSettings settings, TbQueueCallback callback) {
if (!request.getFuture().isDone()) {
pendingRequestMap.putIfAbsent(request.getRequestId(), request);
clusterService.pushMsgToVersionControl(request.getTenantId(), enrichFunction.apply(newRequestProto(request, settings)), callback);
var requestBody = enrichFunction.apply(newRequestProto(request, settings));
log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody);
clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback);
} else {
throw new RuntimeException("Future is already done!");
}
}
@Override
@SuppressWarnings("rawtypes")
public ListenableFuture<List<EntityExportData>> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) {
EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType);
@ -313,10 +309,41 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
commitResult.setRemoved(commitResponse.getRemoved());
commitResult.setModified(commitResponse.getModified());
((CommitGitRequest) request).getFuture().set(commitResult);
} else if (vcResponseMsg.hasListBranchesResponse()) {
var listBranchesResponse = vcResponseMsg.getListBranchesResponse();
((ListBranchesGitRequest) request).getFuture().set(listBranchesResponse.getBranchesList());
} else if (vcResponseMsg.hasListEntitiesResponse()) {
var listEntitiesResponse = vcResponseMsg.getListEntitiesResponse();
((ListEntitiesGitRequest) request).getFuture().set(
listEntitiesResponse.getEntitiesList().stream().map(this::getVersionedEntityInfo).collect(Collectors.toList()));
} else if (vcResponseMsg.hasListVersionsResponse()) {
var listVersionsResponse = vcResponseMsg.getListVersionsResponse();
((ListVersionsGitRequest) request).getFuture().set(
listVersionsResponse.getVersionsList().stream().map(this::getEntityVersion).collect(Collectors.toList()));
} else if (vcResponseMsg.hasEntityContentResponse()) {
var data = vcResponseMsg.getEntityContentResponse().getData();
((EntityContentGitRequest) request).getFuture().set(toData(data));
} else if (vcResponseMsg.hasEntitiesContentResponse()) {
var dataList = vcResponseMsg.getEntitiesContentResponse().getDataList();
((EntitiesContentGitRequest) request).getFuture()
.set(dataList.stream().map(this::toData).collect(Collectors.toList()));
}
}
}
private EntityVersion getEntityVersion(TransportProtos.EntityVersionProto proto) {
return new EntityVersion(proto.getId(), proto.getName());
}
private VersionedEntityInfo getVersionedEntityInfo(TransportProtos.VersionedEntityInfoProto proto) {
return new VersionedEntityInfo(EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())));
}
@SuppressWarnings("rawtypes")
private EntityExportData toData(String data) {
return JacksonUtil.fromString(data, EntityExportData.class);
}
private static <T> TbQueueCallback wrap(SettableFuture<T> future) {
return new TbQueueCallback() {
@Override

View File

@ -1,290 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.sync.vc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.AdminSettings;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ExportableEntity;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
import org.thingsboard.server.common.data.sync.vc.EntitiesVersionControlSettings;
import org.thingsboard.server.common.data.sync.vc.EntityVersion;
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult;
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.settings.AdminSettingsService;
import org.thingsboard.server.dao.tenant.TenantDao;
import org.thingsboard.server.queue.util.AfterStartUp;
import java.io.IOException;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
@Service
@ConditionalOnProperty(prefix = "vc", value = "git.service", havingValue = "local", matchIfMissing = true)
public class LocalGitVersionControlService {
private final ObjectWriter jsonWriter = new ObjectMapper().writer(SerializationFeature.INDENT_OUTPUT);
private final GitRepositoryService gitRepositoryService;
private final TenantDao tenantDao;
private final AdminSettingsService adminSettingsService;
private final ConcurrentMap<TenantId, Lock> tenantRepoLocks = new ConcurrentHashMap<>();
private final Map<TenantId, PendingCommit> pendingCommitMap = new HashMap<>();
//
// @AfterStartUp
// public void init() {
// DaoUtil.processInBatches(tenantDao::findTenantsIds, 100, tenantId -> {
// EntitiesVersionControlSettings settings = getVersionControlSettings(tenantId);
// if (settings != null) {
// try {
// gitRepositoryService.initRepository(tenantId, settings);
// } catch (Exception e) {
// log.warn("Failed to init repository for tenant {}", tenantId, e);
// }
// }
// });
// }
//
// @Override
// public void testRepository(TenantId tenantId, EntitiesVersionControlSettings settings) {
// var lock = getRepoLock(tenantId);
// lock.lock();
// try {
// gitRepositoryService.testRepository(tenantId, settings);
// } catch (Exception e) {
// //TODO: analyze and return meaningful exceptions that we can show to the client;
// throw new RuntimeException(e);
// } finally {
// lock.unlock();
// }
// }
//
// @Override
// public void initRepository(TenantId tenantId, EntitiesVersionControlSettings settings) {
// var lock = getRepoLock(tenantId);
// lock.lock();
// try {
// gitRepositoryService.initRepository(tenantId, settings);
// } catch (Exception e) {
// //TODO: analyze and return meaningful exceptions that we can show to the client;
// throw new RuntimeException(e);
// } finally {
// lock.unlock();
// }
// }
//
// @Override
// public void clearRepository(TenantId tenantId) {
// var lock = getRepoLock(tenantId);
// lock.lock();
// try {
// gitRepositoryService.clearRepository(tenantId);
// } catch (Exception e) {
// //TODO: analyze and return meaningful exceptions that we can show to the client;
// throw new RuntimeException(e);
// } finally {
// lock.unlock();
// }
// }
//
// @Override
// public PendingCommit prepareCommit(TenantId tenantId, VersionCreateRequest request) {
// var lock = getRepoLock(tenantId);
// lock.lock();
// try {
// var pendingCommit = new PendingCommit(tenantId, request);
// PendingCommit old = pendingCommitMap.put(tenantId, pendingCommit);
// if (old != null) {
// gitRepositoryService.abort(old);
// }
// gitRepositoryService.prepareCommit(pendingCommit);
// return pendingCommit;
// } finally {
// lock.unlock();
// }
// }
//
// @Override
// public void deleteAll(PendingCommit commit, EntityType entityType) {
// doInsideLock(commit, c -> {
// try {
// gitRepositoryService.deleteFolderContent(commit, getRelativePath(entityType, null));
// } catch (IOException e) {
// //TODO: analyze and return meaningful exceptions that we can show to the client;
// throw new RuntimeException(e);
// }
// });
// }
//
// @Override
// public void addToCommit(PendingCommit commit, EntityExportData<ExportableEntity<EntityId>> entityData) {
// doInsideLock(commit, c -> {
// String entityDataJson;
// try {
// entityDataJson = jsonWriter.writeValueAsString(entityData);
// gitRepositoryService.add(c, getRelativePath(entityData.getEntityType(),
// entityData.getEntity().getId().toString()), entityDataJson);
// } catch (IOException e) {
// //TODO: analyze and return meaningful exceptions that we can show to the client;
// throw new RuntimeException(e);
// }
// });
// }
//
// @Override
// public VersionCreationResult push(PendingCommit commit) {
// return executeInsideLock(commit, gitRepositoryService::push);
// }
//
// @Override
// public List<EntityVersion> listVersions(TenantId tenantId, String branch) {
// return listVersions(tenantId, branch, (String) null);
// }
//
// @Override
// public List<EntityVersion> listVersions(TenantId tenantId, String branch, EntityType entityType) {
// return listVersions(tenantId, branch, getRelativePath(entityType, null));
// }
//
// @Override
// public List<EntityVersion> listVersions(TenantId tenantId, String branch, EntityId entityId) {
// return listVersions(tenantId, branch, getRelativePath(entityId.getEntityType(), entityId.getId().toString()));
// }
//
// @Override
// public List<VersionedEntityInfo> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, EntityType entityType) {
// try {
// return gitRepositoryService.listEntitiesAtVersion(tenantId, branch, versionId, entityType != null ? getRelativePath(entityType, null) : null);
// } catch (Exception e) {
// //TODO: analyze and return meaningful exceptions that we can show to the client;
// throw new RuntimeException(e);
// }
// }
//
// @Override
// public List<VersionedEntityInfo> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId) {
// return listEntitiesAtVersion(tenantId, branch, versionId, null);
// }
//
// @Override
// public List<String> listBranches(TenantId tenantId) {
// return gitRepositoryService.listBranches(tenantId);
// }
//
// @Override
// public List<EntityExportData<?>> getEntities(TenantId tenantId, String branch, String versionId, EntityType entityType, int offset, int limit) {
// return listEntitiesAtVersion(tenantId, branch, versionId, entityType).stream()
// .skip(offset).limit(limit)
// .map(entityInfo -> getEntity(tenantId, versionId, entityInfo.getExternalId()))
// .collect(Collectors.toList());
// }
//
// @Override
// public EntityExportData<?> getEntity(TenantId tenantId, String versionId, EntityId entityId) {
// try {
// String entityDataJson = gitRepositoryService.getFileContentAtCommit(tenantId,
// getRelativePath(entityId.getEntityType(), entityId.getId().toString()), versionId);
// return JacksonUtil.fromString(entityDataJson, EntityExportData.class);
// } catch (Exception e) {
// //TODO: analyze and return meaningful exceptions that we can show to the client;
// throw new RuntimeException(e);
// }
// }
//
// private EntitiesVersionControlSettings getVersionControlSettings(TenantId tenantId) {
// AdminSettings adminSettings = adminSettingsService.findAdminSettingsByKey(tenantId, EntitiesVersionControlService.SETTINGS_KEY);
// if (adminSettings != null) {
// try {
// return JacksonUtil.convertValue(adminSettings.getJsonValue(), EntitiesVersionControlSettings.class);
// } catch (Exception e) {
// throw new RuntimeException("Failed to load version control settings!", e);
// }
// }
// return null;
// }
//
// private List<EntityVersion> listVersions(TenantId tenantId, String branch, String path) {
// try {
// return gitRepositoryService.listVersions(tenantId, branch, path);
// } catch (Exception e) {
// //TODO: analyze and return meaningful exceptions that we can show to the client;
// throw new RuntimeException(e);
// }
// }
//
// private void doInsideLock(PendingCommit commit, Consumer<PendingCommit> r) {
// var lock = getRepoLock(commit.getTenantId());
// lock.lock();
// try {
// checkCommit(commit);
// r.accept(commit);
// } finally {
// lock.unlock();
// }
// }
//
// private <T> T executeInsideLock(PendingCommit commit, Function<PendingCommit, T> c) {
// var lock = getRepoLock(commit.getTenantId());
// lock.lock();
// try {
// checkCommit(commit);
// return c.apply(commit);
// } finally {
// lock.unlock();
// }
// }
//
// private void checkCommit(PendingCommit commit) {
// PendingCommit existing = pendingCommitMap.get(commit.getTenantId());
// if (existing == null || !existing.getRequestId().equals(commit.getRequestId())) {
// throw new ConcurrentModificationException();
// }
// }
//
// private String getRelativePath(EntityType entityType, String entityId) {
// String path = entityType.name().toLowerCase();
// if (entityId != null) {
// path += "/" + entityId + ".json";
// }
// return path;
// }
//
// private Lock getRepoLock(TenantId tenantId) {
// return tenantRepoLocks.computeIfAbsent(tenantId, t -> new ReentrantLock());
// }
}

View File

@ -24,17 +24,19 @@ import java.util.UUID;
@Getter
public class PendingGitRequest<T> {
private final long createdTime;
private final UUID requestId;
private final TenantId tenantId;
private final SettableFuture<T> future;
public PendingGitRequest(TenantId tenantId) {
this.createdTime = System.currentTimeMillis();
this.requestId = UUID.randomUUID();
this.tenantId = tenantId;
this.future = SettableFuture.create();
}
public boolean requiresSettings(){
public boolean requiresSettings() {
return true;
}
}

View File

@ -723,15 +723,39 @@ message ListVersionsRequestMsg {
int64 entityIdLSB = 4;
}
message EntityVersionProto {
int64 ts = 1;
string id = 2;
string name = 3;
}
message ListVersionsResponseMsg {
repeated EntityVersionProto versions = 1;
}
message ListEntitiesRequestMsg {
string branchName = 1;
string versionId = 2;
string entityType = 3;
}
message VersionedEntityInfoProto {
string entityType = 1;
int64 entityIdMSB = 2;
int64 entityIdLSB = 3;
}
message ListEntitiesResponseMsg {
repeated VersionedEntityInfoProto entities = 1;
}
message ListBranchesRequestMsg {
}
message ListBranchesResponseMsg {
repeated string branches = 1;
}
message EntityContentRequestMsg {
string versionId = 1;
string entityType = 2;
@ -739,6 +763,10 @@ message EntityContentRequestMsg {
int64 entityIdLSB = 4;
}
message EntityContentResponseMsg {
string data = 1;
}
message EntitiesContentRequestMsg {
string versionId = 1;
string entityType = 2;
@ -746,6 +774,10 @@ message EntitiesContentRequestMsg {
int32 limit = 4;
}
message EntitiesContentResponseMsg {
repeated string data = 1;
}
message GenericRepositoryRequestMsg {}
message GenericRepositoryResponseMsg {}
@ -774,6 +806,11 @@ message VersionControlResponseMsg {
string error = 3;
GenericRepositoryResponseMsg genericResponse = 4;
CommitResponseMsg commitResponse = 5;
ListBranchesResponseMsg listBranchesResponse = 6;
ListEntitiesResponseMsg listEntitiesResponse = 7;
ListVersionsResponseMsg listVersionsResponse = 8;
EntityContentResponseMsg entityContentResponse = 9;
EntitiesContentResponseMsg entitiesContentResponse = 10;
}
/**

View File

@ -16,10 +16,16 @@
package org.thingsboard.server.common.data.sync.vc;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.id.EntityId;
@Data
@NoArgsConstructor
public class VersionedEntityInfo {
private EntityId externalId;
// etc..
public VersionedEntityInfo(EntityId externalId) {
this.externalId = externalId;
}
}

View File

@ -23,9 +23,12 @@ import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.sync.vc.EntitiesVersionControlSettings;
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult;
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -58,6 +61,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@TbVersionControlComponent
@ -144,9 +148,21 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
var newSettings = ctx.getSettings();
if (!newSettings.equals(currentSettings)) {
vcService.initRepository(ctx.getTenantId(), ctx.getSettings());
} else {
vcService.fetch(ctx.getTenantId());
}
if (msg.hasCommitRequest()) {
handleCommitRequest(ctx, msg.getCommitRequest());
} else if (msg.hasListBranchesRequest()) {
handleListBranches(ctx, msg.getListBranchesRequest());
} else if (msg.hasListEntitiesRequest()) {
handleListEntities(ctx, msg.getListEntitiesRequest());
} else if (msg.hasListVersionRequest()) {
handleListVersions(ctx, msg.getListVersionRequest());
} else if (msg.hasEntityContentRequest()) {
handleEntityContentRequest(ctx, msg.getEntityContentRequest());
} else if (msg.hasEntitiesContentRequest()) {
handleEntitiesContentRequest(ctx, msg.getEntitiesContentRequest());
}
}
}
@ -172,12 +188,71 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
log.info("TB Version Control request consumer stopped.");
}
private void handleCommitRequest(VersionControlRequestCtx ctx, CommitRequestMsg commitRequest) throws Exception {
private void handleEntitiesContentRequest(VersionControlRequestCtx ctx, EntitiesContentRequestMsg request) throws Exception {
var entityType = EntityType.valueOf(request.getEntityType());
String path = getRelativePath(entityType, null);
var ids = vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), path)
.stream().skip(request.getOffset()).limit(request.getLimit()).collect(Collectors.toList());
var response = EntitiesContentResponseMsg.newBuilder();
for (VersionedEntityInfo info : ids) {
var data = vcService.getFileContentAtCommit(ctx.getTenantId(),
getRelativePath(info.getExternalId().getEntityType(), info.getExternalId().getId().toString()), request.getVersionId());
response.addData(data);
}
reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse(response));
}
private void handleEntityContentRequest(VersionControlRequestCtx ctx, EntityContentRequestMsg request) throws IOException {
String path = getRelativePath(EntityType.valueOf(request.getEntityType()), new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString());
String data = vcService.getFileContentAtCommit(ctx.getTenantId(), path, request.getVersionId());
reply(ctx, Optional.empty(), builder -> builder.setEntityContentResponse(EntityContentResponseMsg.newBuilder().setData(data)));
}
private void handleListVersions(VersionControlRequestCtx ctx, ListVersionsRequestMsg request) throws Exception {
String path;
if (StringUtils.isNotEmpty(request.getEntityType())) {
var entityType = EntityType.valueOf(request.getEntityType());
if (request.getEntityIdLSB() != 0 || request.getEntityIdMSB() != 0) {
path = getRelativePath(entityType, new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString());
} else {
path = getRelativePath(entityType, null);
}
} else {
path = null;
}
var data = vcService.listVersions(ctx.getTenantId(), request.getBranchName(), path);
reply(ctx, Optional.empty(), builder ->
builder.setListVersionsResponse(ListVersionsResponseMsg.newBuilder()
.addAllVersions(data.stream().map(
v -> EntityVersionProto.newBuilder().setId(v.getId()).setName(v.getName()).build()
).collect(Collectors.toList()))));
}
private void handleListEntities(VersionControlRequestCtx ctx, ListEntitiesRequestMsg request) throws Exception {
EntityType entityType = StringUtils.isNotEmpty(request.getEntityType()) ? EntityType.valueOf(request.getEntityType()) : null;
var path = entityType != null ? getRelativePath(entityType, null) : null;
var data = vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), path);
reply(ctx, Optional.empty(), builder ->
builder.setListEntitiesResponse(ListEntitiesResponseMsg.newBuilder()
.addAllEntities(data.stream().map(VersionedEntityInfo::getExternalId).map(
id -> VersionedEntityInfoProto.newBuilder()
.setEntityType(id.getEntityType().name())
.setEntityIdMSB(id.getId().getMostSignificantBits())
.setEntityIdLSB(id.getId().getLeastSignificantBits()).build()
).collect(Collectors.toList()))));
}
private void handleListBranches(VersionControlRequestCtx ctx, ListBranchesRequestMsg request) {
var branches = vcService.listBranches(ctx.getTenantId());
reply(ctx, Optional.empty(), builder -> builder.setListBranchesResponse(ListBranchesResponseMsg.newBuilder().addAllBranches(branches)));
}
private void handleCommitRequest(VersionControlRequestCtx ctx, CommitRequestMsg request) throws Exception {
var tenantId = ctx.getTenantId();
UUID txId = UUID.fromString(commitRequest.getTxId());
if (commitRequest.hasPrepareMsg()) {
prepareCommit(ctx, txId, commitRequest.getPrepareMsg());
} else if (commitRequest.hasAbortMsg()) {
UUID txId = UUID.fromString(request.getTxId());
if (request.hasPrepareMsg()) {
prepareCommit(ctx, txId, request.getPrepareMsg());
} else if (request.hasAbortMsg()) {
PendingCommit current = pendingCommitMap.get(tenantId);
if (current != null && current.getTxId().equals(txId)) {
doAbortCurrentCommit(tenantId, current);
@ -186,11 +261,11 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
PendingCommit current = pendingCommitMap.get(tenantId);
if (current != null && current.getTxId().equals(txId)) {
try {
if (commitRequest.hasAddMsg()) {
addToCommit(ctx, current, commitRequest.getAddMsg());
} else if (commitRequest.hasDeleteMsg()) {
deleteFromCommit(ctx, current, commitRequest.getDeleteMsg());
} else if (commitRequest.hasPushMsg()) {
if (request.hasAddMsg()) {
addToCommit(ctx, current, request.getAddMsg());
} else if (request.hasDeleteMsg()) {
deleteFromCommit(ctx, current, request.getDeleteMsg());
} else if (request.hasPushMsg()) {
reply(ctx, vcService.push(current));
}
} catch (Exception e) {
@ -198,7 +273,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
throw e;
}
} else {
log.debug("[{}] Ignore request due to stale commit: {}", txId, commitRequest);
log.debug("[{}] Ignore request due to stale commit: {}", txId, request);
}
}
}
@ -290,7 +365,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
builder.setGenericResponse(TransportProtos.GenericRepositoryResponseMsg.newBuilder().build());
}
ToCoreNotificationMsg msg = ToCoreNotificationMsg.newBuilder().setVcResponseMsg(builder).build();
log.trace("PUSHING msg: {} to: {}", msg, tpi);
log.trace("[{}][{}] PUSHING reply: {} to: {}", ctx.getTenantId(), ctx.getRequestId(), msg, tpi);
producer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null);
}
@ -304,9 +379,15 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
}
}
private String getRelativePath(EntityType entityType, String entityId) {
String path = entityType.name().toLowerCase();
if (entityId != null) {
path += "/" + entityId + ".json";
}
return path;
}
private Lock getRepoLock(TenantId tenantId) {
return tenantRepoLocks.computeIfAbsent(tenantId, t -> new ReentrantLock(true));
}
}

View File

@ -160,6 +160,11 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
//TODO: implement;
}
@Override
public void fetch(TenantId tenantId) {
//Fetch latest changes on demand.
}
@Override
public String getFileContentAtCommit(TenantId tenantId, String relativePath, String versionId) throws IOException {
GitRepository repository = checkRepository(tenantId);
@ -197,9 +202,8 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
}
@Override
public List<VersionedEntityInfo> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, String path) throws Exception {
public List<VersionedEntityInfo> listEntitiesAtVersion(TenantId tenantId, String versionId, String path) throws Exception {
GitRepository repository = checkRepository(tenantId);
checkVersion(tenantId, branch, versionId);
return repository.listFilesAtCommit(versionId, path).stream()
.map(filePath -> {
EntityId entityId = fromRelativePath(filePath);

View File

@ -30,7 +30,7 @@ public interface GitRepositoryService {
List<EntityVersion> listVersions(TenantId tenantId, String branch, String path) throws Exception;
List<VersionedEntityInfo> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, String path) throws Exception;
List<VersionedEntityInfo> listEntitiesAtVersion(TenantId tenantId, String versionId, String path) throws Exception;
void testRepository(TenantId tenantId, EntitiesVersionControlSettings settings) throws Exception;
@ -51,4 +51,6 @@ public interface GitRepositoryService {
List<String> listBranches(TenantId tenantId);
String getFileContentAtCommit(TenantId tenantId, String relativePath, String versionId) throws IOException;
void fetch(TenantId tenantId);
}