From 1065fd9fbebec782ed7fddc268f26d712cd10917 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 24 May 2022 11:38:03 +0300 Subject: [PATCH] Implementation of the API --- .../DefaultGitVersionControlQueueService.java | 49 ++- .../vc/LocalGitVersionControlService.java | 290 ------------------ .../service/sync/vc/PendingGitRequest.java | 4 +- common/cluster-api/src/main/proto/queue.proto | 37 +++ .../data/sync/vc/VersionedEntityInfo.java | 6 + .../DefaultClusterVersionControlService.java | 109 ++++++- .../sync/vc/DefaultGitRepositoryService.java | 8 +- .../service/sync/vc/GitRepositoryService.java | 4 +- 8 files changed, 188 insertions(+), 319 deletions(-) delete mode 100644 application/src/main/java/org/thingsboard/server/service/sync/vc/LocalGitVersionControlService.java diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java index 7ffcbb9c14..0d3f2a4431 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java @@ -21,17 +21,16 @@ import com.fasterxml.jackson.databind.SerializationFeature; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cluster.TbClusterService; -import org.thingsboard.server.common.data.AdminSettings; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.ExportableEntity; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.sync.ie.EntityExportData; import org.thingsboard.server.common.data.sync.vc.EntitiesVersionControlSettings; @@ -61,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.function.Function; +import java.util.stream.Collectors; @TbCoreComponent @Service @@ -209,6 +209,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu } @Override + @SuppressWarnings("rawtypes") public ListenableFuture getEntity(TenantId tenantId, String versionId, EntityId entityId) { EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId); registerAndSend(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder() @@ -218,14 +219,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu .setEntityIdLSB(entityId.getId().getLeastSignificantBits())).build() , wrap(request.getFuture())); return request.getFuture(); -// try { -// String entityDataJson = gitRepositoryService.getFileContentAtCommit(tenantId, -// getRelativePath(entityId.getEntityType(), entityId.getId().toString()), versionId); -// return JacksonUtil.fromString(entityDataJson, EntityExportData.class); -// } catch (Exception e) { -// //TODO: analyze and return meaningful exceptions that we can show to the client; -// throw new RuntimeException(e); -// } } private void registerAndSend(PendingGitRequest request, @@ -237,13 +230,16 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu Function enrichFunction, EntitiesVersionControlSettings settings, TbQueueCallback callback) { if (!request.getFuture().isDone()) { pendingRequestMap.putIfAbsent(request.getRequestId(), request); - clusterService.pushMsgToVersionControl(request.getTenantId(), enrichFunction.apply(newRequestProto(request, settings)), callback); + var requestBody = enrichFunction.apply(newRequestProto(request, settings)); + log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody); + clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback); } else { throw new RuntimeException("Future is already done!"); } } @Override + @SuppressWarnings("rawtypes") public ListenableFuture> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) { EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType); @@ -313,10 +309,41 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu commitResult.setRemoved(commitResponse.getRemoved()); commitResult.setModified(commitResponse.getModified()); ((CommitGitRequest) request).getFuture().set(commitResult); + } else if (vcResponseMsg.hasListBranchesResponse()) { + var listBranchesResponse = vcResponseMsg.getListBranchesResponse(); + ((ListBranchesGitRequest) request).getFuture().set(listBranchesResponse.getBranchesList()); + } else if (vcResponseMsg.hasListEntitiesResponse()) { + var listEntitiesResponse = vcResponseMsg.getListEntitiesResponse(); + ((ListEntitiesGitRequest) request).getFuture().set( + listEntitiesResponse.getEntitiesList().stream().map(this::getVersionedEntityInfo).collect(Collectors.toList())); + } else if (vcResponseMsg.hasListVersionsResponse()) { + var listVersionsResponse = vcResponseMsg.getListVersionsResponse(); + ((ListVersionsGitRequest) request).getFuture().set( + listVersionsResponse.getVersionsList().stream().map(this::getEntityVersion).collect(Collectors.toList())); + } else if (vcResponseMsg.hasEntityContentResponse()) { + var data = vcResponseMsg.getEntityContentResponse().getData(); + ((EntityContentGitRequest) request).getFuture().set(toData(data)); + } else if (vcResponseMsg.hasEntitiesContentResponse()) { + var dataList = vcResponseMsg.getEntitiesContentResponse().getDataList(); + ((EntitiesContentGitRequest) request).getFuture() + .set(dataList.stream().map(this::toData).collect(Collectors.toList())); } } } + private EntityVersion getEntityVersion(TransportProtos.EntityVersionProto proto) { + return new EntityVersion(proto.getId(), proto.getName()); + } + + private VersionedEntityInfo getVersionedEntityInfo(TransportProtos.VersionedEntityInfoProto proto) { + return new VersionedEntityInfo(EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()))); + } + + @SuppressWarnings("rawtypes") + private EntityExportData toData(String data) { + return JacksonUtil.fromString(data, EntityExportData.class); + } + private static TbQueueCallback wrap(SettableFuture future) { return new TbQueueCallback() { @Override diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/LocalGitVersionControlService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/LocalGitVersionControlService.java deleted file mode 100644 index d995472531..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/LocalGitVersionControlService.java +++ /dev/null @@ -1,290 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.sync.vc; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.SerializationFeature; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Service; -import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.data.AdminSettings; -import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.ExportableEntity; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.sync.ie.EntityExportData; -import org.thingsboard.server.common.data.sync.vc.EntitiesVersionControlSettings; -import org.thingsboard.server.common.data.sync.vc.EntityVersion; -import org.thingsboard.server.common.data.sync.vc.VersionCreationResult; -import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo; -import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest; -import org.thingsboard.server.dao.DaoUtil; -import org.thingsboard.server.dao.settings.AdminSettingsService; -import org.thingsboard.server.dao.tenant.TenantDao; -import org.thingsboard.server.queue.util.AfterStartUp; - -import java.io.IOException; -import java.util.ConcurrentModificationException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.stream.Collectors; - -@Slf4j -@RequiredArgsConstructor -@Service -@ConditionalOnProperty(prefix = "vc", value = "git.service", havingValue = "local", matchIfMissing = true) -public class LocalGitVersionControlService { - - private final ObjectWriter jsonWriter = new ObjectMapper().writer(SerializationFeature.INDENT_OUTPUT); - private final GitRepositoryService gitRepositoryService; - private final TenantDao tenantDao; - private final AdminSettingsService adminSettingsService; - private final ConcurrentMap tenantRepoLocks = new ConcurrentHashMap<>(); - private final Map pendingCommitMap = new HashMap<>(); -// -// @AfterStartUp -// public void init() { -// DaoUtil.processInBatches(tenantDao::findTenantsIds, 100, tenantId -> { -// EntitiesVersionControlSettings settings = getVersionControlSettings(tenantId); -// if (settings != null) { -// try { -// gitRepositoryService.initRepository(tenantId, settings); -// } catch (Exception e) { -// log.warn("Failed to init repository for tenant {}", tenantId, e); -// } -// } -// }); -// } -// -// @Override -// public void testRepository(TenantId tenantId, EntitiesVersionControlSettings settings) { -// var lock = getRepoLock(tenantId); -// lock.lock(); -// try { -// gitRepositoryService.testRepository(tenantId, settings); -// } catch (Exception e) { -// //TODO: analyze and return meaningful exceptions that we can show to the client; -// throw new RuntimeException(e); -// } finally { -// lock.unlock(); -// } -// } -// -// @Override -// public void initRepository(TenantId tenantId, EntitiesVersionControlSettings settings) { -// var lock = getRepoLock(tenantId); -// lock.lock(); -// try { -// gitRepositoryService.initRepository(tenantId, settings); -// } catch (Exception e) { -// //TODO: analyze and return meaningful exceptions that we can show to the client; -// throw new RuntimeException(e); -// } finally { -// lock.unlock(); -// } -// } -// -// @Override -// public void clearRepository(TenantId tenantId) { -// var lock = getRepoLock(tenantId); -// lock.lock(); -// try { -// gitRepositoryService.clearRepository(tenantId); -// } catch (Exception e) { -// //TODO: analyze and return meaningful exceptions that we can show to the client; -// throw new RuntimeException(e); -// } finally { -// lock.unlock(); -// } -// } -// -// @Override -// public PendingCommit prepareCommit(TenantId tenantId, VersionCreateRequest request) { -// var lock = getRepoLock(tenantId); -// lock.lock(); -// try { -// var pendingCommit = new PendingCommit(tenantId, request); -// PendingCommit old = pendingCommitMap.put(tenantId, pendingCommit); -// if (old != null) { -// gitRepositoryService.abort(old); -// } -// gitRepositoryService.prepareCommit(pendingCommit); -// return pendingCommit; -// } finally { -// lock.unlock(); -// } -// } -// -// @Override -// public void deleteAll(PendingCommit commit, EntityType entityType) { -// doInsideLock(commit, c -> { -// try { -// gitRepositoryService.deleteFolderContent(commit, getRelativePath(entityType, null)); -// } catch (IOException e) { -// //TODO: analyze and return meaningful exceptions that we can show to the client; -// throw new RuntimeException(e); -// } -// }); -// } -// -// @Override -// public void addToCommit(PendingCommit commit, EntityExportData> entityData) { -// doInsideLock(commit, c -> { -// String entityDataJson; -// try { -// entityDataJson = jsonWriter.writeValueAsString(entityData); -// gitRepositoryService.add(c, getRelativePath(entityData.getEntityType(), -// entityData.getEntity().getId().toString()), entityDataJson); -// } catch (IOException e) { -// //TODO: analyze and return meaningful exceptions that we can show to the client; -// throw new RuntimeException(e); -// } -// }); -// } -// -// @Override -// public VersionCreationResult push(PendingCommit commit) { -// return executeInsideLock(commit, gitRepositoryService::push); -// } -// -// @Override -// public List listVersions(TenantId tenantId, String branch) { -// return listVersions(tenantId, branch, (String) null); -// } -// -// @Override -// public List listVersions(TenantId tenantId, String branch, EntityType entityType) { -// return listVersions(tenantId, branch, getRelativePath(entityType, null)); -// } -// -// @Override -// public List listVersions(TenantId tenantId, String branch, EntityId entityId) { -// return listVersions(tenantId, branch, getRelativePath(entityId.getEntityType(), entityId.getId().toString())); -// } -// -// @Override -// public List listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, EntityType entityType) { -// try { -// return gitRepositoryService.listEntitiesAtVersion(tenantId, branch, versionId, entityType != null ? getRelativePath(entityType, null) : null); -// } catch (Exception e) { -// //TODO: analyze and return meaningful exceptions that we can show to the client; -// throw new RuntimeException(e); -// } -// } -// -// @Override -// public List listEntitiesAtVersion(TenantId tenantId, String branch, String versionId) { -// return listEntitiesAtVersion(tenantId, branch, versionId, null); -// } -// -// @Override -// public List listBranches(TenantId tenantId) { -// return gitRepositoryService.listBranches(tenantId); -// } -// -// @Override -// public List> getEntities(TenantId tenantId, String branch, String versionId, EntityType entityType, int offset, int limit) { -// return listEntitiesAtVersion(tenantId, branch, versionId, entityType).stream() -// .skip(offset).limit(limit) -// .map(entityInfo -> getEntity(tenantId, versionId, entityInfo.getExternalId())) -// .collect(Collectors.toList()); -// } -// -// @Override -// public EntityExportData getEntity(TenantId tenantId, String versionId, EntityId entityId) { -// try { -// String entityDataJson = gitRepositoryService.getFileContentAtCommit(tenantId, -// getRelativePath(entityId.getEntityType(), entityId.getId().toString()), versionId); -// return JacksonUtil.fromString(entityDataJson, EntityExportData.class); -// } catch (Exception e) { -// //TODO: analyze and return meaningful exceptions that we can show to the client; -// throw new RuntimeException(e); -// } -// } -// -// private EntitiesVersionControlSettings getVersionControlSettings(TenantId tenantId) { -// AdminSettings adminSettings = adminSettingsService.findAdminSettingsByKey(tenantId, EntitiesVersionControlService.SETTINGS_KEY); -// if (adminSettings != null) { -// try { -// return JacksonUtil.convertValue(adminSettings.getJsonValue(), EntitiesVersionControlSettings.class); -// } catch (Exception e) { -// throw new RuntimeException("Failed to load version control settings!", e); -// } -// } -// return null; -// } -// -// private List listVersions(TenantId tenantId, String branch, String path) { -// try { -// return gitRepositoryService.listVersions(tenantId, branch, path); -// } catch (Exception e) { -// //TODO: analyze and return meaningful exceptions that we can show to the client; -// throw new RuntimeException(e); -// } -// } -// -// private void doInsideLock(PendingCommit commit, Consumer r) { -// var lock = getRepoLock(commit.getTenantId()); -// lock.lock(); -// try { -// checkCommit(commit); -// r.accept(commit); -// } finally { -// lock.unlock(); -// } -// } -// -// private T executeInsideLock(PendingCommit commit, Function c) { -// var lock = getRepoLock(commit.getTenantId()); -// lock.lock(); -// try { -// checkCommit(commit); -// return c.apply(commit); -// } finally { -// lock.unlock(); -// } -// } -// -// private void checkCommit(PendingCommit commit) { -// PendingCommit existing = pendingCommitMap.get(commit.getTenantId()); -// if (existing == null || !existing.getRequestId().equals(commit.getRequestId())) { -// throw new ConcurrentModificationException(); -// } -// } -// -// private String getRelativePath(EntityType entityType, String entityId) { -// String path = entityType.name().toLowerCase(); -// if (entityId != null) { -// path += "/" + entityId + ".json"; -// } -// return path; -// } -// -// private Lock getRepoLock(TenantId tenantId) { -// return tenantRepoLocks.computeIfAbsent(tenantId, t -> new ReentrantLock()); -// } - -} diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/PendingGitRequest.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/PendingGitRequest.java index 9b612cee5b..e63ba04f45 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/PendingGitRequest.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/PendingGitRequest.java @@ -24,17 +24,19 @@ import java.util.UUID; @Getter public class PendingGitRequest { + private final long createdTime; private final UUID requestId; private final TenantId tenantId; private final SettableFuture future; public PendingGitRequest(TenantId tenantId) { + this.createdTime = System.currentTimeMillis(); this.requestId = UUID.randomUUID(); this.tenantId = tenantId; this.future = SettableFuture.create(); } - public boolean requiresSettings(){ + public boolean requiresSettings() { return true; } } diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 6d45698451..54e46eebcc 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -723,15 +723,39 @@ message ListVersionsRequestMsg { int64 entityIdLSB = 4; } +message EntityVersionProto { + int64 ts = 1; + string id = 2; + string name = 3; +} + +message ListVersionsResponseMsg { + repeated EntityVersionProto versions = 1; +} + message ListEntitiesRequestMsg { string branchName = 1; string versionId = 2; string entityType = 3; } +message VersionedEntityInfoProto { + string entityType = 1; + int64 entityIdMSB = 2; + int64 entityIdLSB = 3; +} + +message ListEntitiesResponseMsg { + repeated VersionedEntityInfoProto entities = 1; +} + message ListBranchesRequestMsg { } +message ListBranchesResponseMsg { + repeated string branches = 1; +} + message EntityContentRequestMsg { string versionId = 1; string entityType = 2; @@ -739,6 +763,10 @@ message EntityContentRequestMsg { int64 entityIdLSB = 4; } +message EntityContentResponseMsg { + string data = 1; +} + message EntitiesContentRequestMsg { string versionId = 1; string entityType = 2; @@ -746,6 +774,10 @@ message EntitiesContentRequestMsg { int32 limit = 4; } +message EntitiesContentResponseMsg { + repeated string data = 1; +} + message GenericRepositoryRequestMsg {} message GenericRepositoryResponseMsg {} @@ -774,6 +806,11 @@ message VersionControlResponseMsg { string error = 3; GenericRepositoryResponseMsg genericResponse = 4; CommitResponseMsg commitResponse = 5; + ListBranchesResponseMsg listBranchesResponse = 6; + ListEntitiesResponseMsg listEntitiesResponse = 7; + ListVersionsResponseMsg listVersionsResponse = 8; + EntityContentResponseMsg entityContentResponse = 9; + EntitiesContentResponseMsg entitiesContentResponse = 10; } /** diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/VersionedEntityInfo.java b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/VersionedEntityInfo.java index 163fe4c6d2..fd278cde1f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/VersionedEntityInfo.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/VersionedEntityInfo.java @@ -16,10 +16,16 @@ package org.thingsboard.server.common.data.sync.vc; import lombok.Data; +import lombok.NoArgsConstructor; import org.thingsboard.server.common.data.id.EntityId; @Data +@NoArgsConstructor public class VersionedEntityInfo { private EntityId externalId; // etc.. + + public VersionedEntityInfo(EntityId externalId) { + this.externalId = externalId; + } } diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java index b9c595beac..b282799e24 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java @@ -23,9 +23,12 @@ import org.springframework.context.event.EventListener; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.sync.vc.EntitiesVersionControlSettings; import org.thingsboard.server.common.data.sync.vc.VersionCreationResult; +import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; @@ -58,6 +61,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.stream.Collectors; @Slf4j @TbVersionControlComponent @@ -144,9 +148,21 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe var newSettings = ctx.getSettings(); if (!newSettings.equals(currentSettings)) { vcService.initRepository(ctx.getTenantId(), ctx.getSettings()); + } else { + vcService.fetch(ctx.getTenantId()); } if (msg.hasCommitRequest()) { handleCommitRequest(ctx, msg.getCommitRequest()); + } else if (msg.hasListBranchesRequest()) { + handleListBranches(ctx, msg.getListBranchesRequest()); + } else if (msg.hasListEntitiesRequest()) { + handleListEntities(ctx, msg.getListEntitiesRequest()); + } else if (msg.hasListVersionRequest()) { + handleListVersions(ctx, msg.getListVersionRequest()); + } else if (msg.hasEntityContentRequest()) { + handleEntityContentRequest(ctx, msg.getEntityContentRequest()); + } else if (msg.hasEntitiesContentRequest()) { + handleEntitiesContentRequest(ctx, msg.getEntitiesContentRequest()); } } } @@ -172,12 +188,71 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe log.info("TB Version Control request consumer stopped."); } - private void handleCommitRequest(VersionControlRequestCtx ctx, CommitRequestMsg commitRequest) throws Exception { + private void handleEntitiesContentRequest(VersionControlRequestCtx ctx, EntitiesContentRequestMsg request) throws Exception { + var entityType = EntityType.valueOf(request.getEntityType()); + String path = getRelativePath(entityType, null); + var ids = vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), path) + .stream().skip(request.getOffset()).limit(request.getLimit()).collect(Collectors.toList()); + var response = EntitiesContentResponseMsg.newBuilder(); + for (VersionedEntityInfo info : ids) { + var data = vcService.getFileContentAtCommit(ctx.getTenantId(), + getRelativePath(info.getExternalId().getEntityType(), info.getExternalId().getId().toString()), request.getVersionId()); + response.addData(data); + } + reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse(response)); + } + + private void handleEntityContentRequest(VersionControlRequestCtx ctx, EntityContentRequestMsg request) throws IOException { + String path = getRelativePath(EntityType.valueOf(request.getEntityType()), new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString()); + String data = vcService.getFileContentAtCommit(ctx.getTenantId(), path, request.getVersionId()); + reply(ctx, Optional.empty(), builder -> builder.setEntityContentResponse(EntityContentResponseMsg.newBuilder().setData(data))); + } + + private void handleListVersions(VersionControlRequestCtx ctx, ListVersionsRequestMsg request) throws Exception { + String path; + if (StringUtils.isNotEmpty(request.getEntityType())) { + var entityType = EntityType.valueOf(request.getEntityType()); + if (request.getEntityIdLSB() != 0 || request.getEntityIdMSB() != 0) { + path = getRelativePath(entityType, new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString()); + } else { + path = getRelativePath(entityType, null); + } + } else { + path = null; + } + var data = vcService.listVersions(ctx.getTenantId(), request.getBranchName(), path); + reply(ctx, Optional.empty(), builder -> + builder.setListVersionsResponse(ListVersionsResponseMsg.newBuilder() + .addAllVersions(data.stream().map( + v -> EntityVersionProto.newBuilder().setId(v.getId()).setName(v.getName()).build() + ).collect(Collectors.toList())))); + } + + private void handleListEntities(VersionControlRequestCtx ctx, ListEntitiesRequestMsg request) throws Exception { + EntityType entityType = StringUtils.isNotEmpty(request.getEntityType()) ? EntityType.valueOf(request.getEntityType()) : null; + var path = entityType != null ? getRelativePath(entityType, null) : null; + var data = vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), path); + reply(ctx, Optional.empty(), builder -> + builder.setListEntitiesResponse(ListEntitiesResponseMsg.newBuilder() + .addAllEntities(data.stream().map(VersionedEntityInfo::getExternalId).map( + id -> VersionedEntityInfoProto.newBuilder() + .setEntityType(id.getEntityType().name()) + .setEntityIdMSB(id.getId().getMostSignificantBits()) + .setEntityIdLSB(id.getId().getLeastSignificantBits()).build() + ).collect(Collectors.toList())))); + } + + private void handleListBranches(VersionControlRequestCtx ctx, ListBranchesRequestMsg request) { + var branches = vcService.listBranches(ctx.getTenantId()); + reply(ctx, Optional.empty(), builder -> builder.setListBranchesResponse(ListBranchesResponseMsg.newBuilder().addAllBranches(branches))); + } + + private void handleCommitRequest(VersionControlRequestCtx ctx, CommitRequestMsg request) throws Exception { var tenantId = ctx.getTenantId(); - UUID txId = UUID.fromString(commitRequest.getTxId()); - if (commitRequest.hasPrepareMsg()) { - prepareCommit(ctx, txId, commitRequest.getPrepareMsg()); - } else if (commitRequest.hasAbortMsg()) { + UUID txId = UUID.fromString(request.getTxId()); + if (request.hasPrepareMsg()) { + prepareCommit(ctx, txId, request.getPrepareMsg()); + } else if (request.hasAbortMsg()) { PendingCommit current = pendingCommitMap.get(tenantId); if (current != null && current.getTxId().equals(txId)) { doAbortCurrentCommit(tenantId, current); @@ -186,11 +261,11 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe PendingCommit current = pendingCommitMap.get(tenantId); if (current != null && current.getTxId().equals(txId)) { try { - if (commitRequest.hasAddMsg()) { - addToCommit(ctx, current, commitRequest.getAddMsg()); - } else if (commitRequest.hasDeleteMsg()) { - deleteFromCommit(ctx, current, commitRequest.getDeleteMsg()); - } else if (commitRequest.hasPushMsg()) { + if (request.hasAddMsg()) { + addToCommit(ctx, current, request.getAddMsg()); + } else if (request.hasDeleteMsg()) { + deleteFromCommit(ctx, current, request.getDeleteMsg()); + } else if (request.hasPushMsg()) { reply(ctx, vcService.push(current)); } } catch (Exception e) { @@ -198,7 +273,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe throw e; } } else { - log.debug("[{}] Ignore request due to stale commit: {}", txId, commitRequest); + log.debug("[{}] Ignore request due to stale commit: {}", txId, request); } } } @@ -290,7 +365,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe builder.setGenericResponse(TransportProtos.GenericRepositoryResponseMsg.newBuilder().build()); } ToCoreNotificationMsg msg = ToCoreNotificationMsg.newBuilder().setVcResponseMsg(builder).build(); - log.trace("PUSHING msg: {} to: {}", msg, tpi); + log.trace("[{}][{}] PUSHING reply: {} to: {}", ctx.getTenantId(), ctx.getRequestId(), msg, tpi); producer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null); } @@ -304,9 +379,15 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe } } + private String getRelativePath(EntityType entityType, String entityId) { + String path = entityType.name().toLowerCase(); + if (entityId != null) { + path += "/" + entityId + ".json"; + } + return path; + } + private Lock getRepoLock(TenantId tenantId) { return tenantRepoLocks.computeIfAbsent(tenantId, t -> new ReentrantLock(true)); } - - } diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitRepositoryService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitRepositoryService.java index 0d55783a31..758641f6eb 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitRepositoryService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitRepositoryService.java @@ -160,6 +160,11 @@ public class DefaultGitRepositoryService implements GitRepositoryService { //TODO: implement; } + @Override + public void fetch(TenantId tenantId) { + //Fetch latest changes on demand. + } + @Override public String getFileContentAtCommit(TenantId tenantId, String relativePath, String versionId) throws IOException { GitRepository repository = checkRepository(tenantId); @@ -197,9 +202,8 @@ public class DefaultGitRepositoryService implements GitRepositoryService { } @Override - public List listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, String path) throws Exception { + public List listEntitiesAtVersion(TenantId tenantId, String versionId, String path) throws Exception { GitRepository repository = checkRepository(tenantId); - checkVersion(tenantId, branch, versionId); return repository.listFilesAtCommit(versionId, path).stream() .map(filePath -> { EntityId entityId = fromRelativePath(filePath); diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepositoryService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepositoryService.java index c748147856..54ab91582e 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepositoryService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepositoryService.java @@ -30,7 +30,7 @@ public interface GitRepositoryService { List listVersions(TenantId tenantId, String branch, String path) throws Exception; - List listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, String path) throws Exception; + List listEntitiesAtVersion(TenantId tenantId, String versionId, String path) throws Exception; void testRepository(TenantId tenantId, EntitiesVersionControlSettings settings) throws Exception; @@ -51,4 +51,6 @@ public interface GitRepositoryService { List listBranches(TenantId tenantId); String getFileContentAtCommit(TenantId tenantId, String relativePath, String versionId) throws IOException; + + void fetch(TenantId tenantId); }