Merge pull request #6689 from ViacheslavKlimov/vc-improvements
VC improvements
This commit is contained in:
commit
1d21ee52d9
@ -29,6 +29,14 @@ ALTER TABLE customer
|
||||
ALTER TABLE widgets_bundle
|
||||
ADD COLUMN IF NOT EXISTS external_id UUID;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_device_external_id ON device(tenant_id, external_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_device_profile_external_id ON device_profile(tenant_id, external_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_asset_external_id ON asset(tenant_id, external_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_rule_chain_external_id ON rule_chain(tenant_id, external_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_dashboard_external_id ON dashboard(tenant_id, external_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_customer_external_id ON customer(tenant_id, external_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_widgets_bundle_external_id ON widgets_bundle(tenant_id, external_id);
|
||||
|
||||
ALTER TABLE admin_settings
|
||||
ADD COLUMN IF NOT EXISTS tenant_id uuid NOT NULL DEFAULT '13814000-1dd2-11b2-8080-808080808080';
|
||||
|
||||
|
||||
@ -20,6 +20,7 @@ import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.google.protobuf.ByteString;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
@ -34,9 +35,9 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
|
||||
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
|
||||
import org.thingsboard.server.common.data.sync.vc.EntityVersion;
|
||||
import org.thingsboard.server.common.data.sync.vc.EntityVersionsDiff;
|
||||
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
|
||||
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult;
|
||||
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
|
||||
import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest;
|
||||
@ -53,6 +54,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.VersionControlRespon
|
||||
import org.thingsboard.server.queue.TbQueueCallback;
|
||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
|
||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.sync.vc.data.ClearRepositoryGitRequest;
|
||||
@ -67,7 +69,13 @@ import org.thingsboard.server.service.sync.vc.data.PendingGitRequest;
|
||||
import org.thingsboard.server.service.sync.vc.data.VersionsDiffGitRequest;
|
||||
import org.thingsboard.server.service.sync.vc.data.VoidGitRequest;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
@ -81,16 +89,22 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
|
||||
private final TbClusterService clusterService;
|
||||
private final DataDecodingEncodingService encodingService;
|
||||
private final DefaultEntitiesVersionControlService entitiesVersionControlService;
|
||||
private final SchedulerComponent scheduler;
|
||||
|
||||
private final Map<UUID, PendingGitRequest<?>> pendingRequestMap = new HashMap<>();
|
||||
|
||||
@Value("${queue.vc.request-timeout:60000}")
|
||||
private int requestTimeout;
|
||||
|
||||
public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService,
|
||||
DataDecodingEncodingService encodingService,
|
||||
@Lazy DefaultEntitiesVersionControlService entitiesVersionControlService) {
|
||||
@Lazy DefaultEntitiesVersionControlService entitiesVersionControlService,
|
||||
SchedulerComponent scheduler) {
|
||||
this.serviceInfoProvider = serviceInfoProvider;
|
||||
this.clusterService = clusterService;
|
||||
this.encodingService = encodingService;
|
||||
this.entitiesVersionControlService = entitiesVersionControlService;
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -275,6 +289,9 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
|
||||
var requestBody = enrichFunction.apply(newRequestProto(request, settings));
|
||||
log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody);
|
||||
clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback);
|
||||
request.setTimeoutTask(scheduler.schedule(() -> {
|
||||
processTimeout(request.getRequestId());
|
||||
}, requestTimeout, TimeUnit.MILLISECONDS));
|
||||
} else {
|
||||
throw new RuntimeException("Future is already done!");
|
||||
}
|
||||
@ -338,12 +355,13 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
|
||||
@Override
|
||||
public void processResponse(VersionControlResponseMsg vcResponseMsg) {
|
||||
UUID requestId = new UUID(vcResponseMsg.getRequestIdMSB(), vcResponseMsg.getRequestIdLSB());
|
||||
PendingGitRequest<?> request = pendingRequestMap.get(requestId);
|
||||
PendingGitRequest<?> request = pendingRequestMap.remove(requestId);
|
||||
if (request == null) {
|
||||
log.debug("[{}] received stale response: {}", requestId, vcResponseMsg);
|
||||
return;
|
||||
} else {
|
||||
log.debug("[{}] processing response: {}", requestId, vcResponseMsg);
|
||||
request.getTimeoutTask().cancel(true);
|
||||
}
|
||||
var future = request.getFuture();
|
||||
if (!StringUtils.isEmpty(vcResponseMsg.getError())) {
|
||||
@ -399,6 +417,14 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
|
||||
}
|
||||
}
|
||||
|
||||
private void processTimeout(UUID requestId) {
|
||||
PendingGitRequest<?> pendingRequest = pendingRequestMap.remove(requestId);
|
||||
if (pendingRequest != null) {
|
||||
log.debug("[{}] request timed out ({} ms}", requestId, requestTimeout);
|
||||
pendingRequest.getFuture().setException(new TimeoutException("Request timed out"));
|
||||
}
|
||||
}
|
||||
|
||||
private PageData<EntityVersion> toPageData(TransportProtos.ListVersionsResponseMsg listVersionsResponse) {
|
||||
var listVersions = listVersionsResponse.getVersionsList().stream().map(this::getEntityVersion).collect(Collectors.toList());
|
||||
return new PageData<>(listVersions, listVersionsResponse.getTotalPages(), listVersionsResponse.getTotalElements(), listVersionsResponse.getHasNext());
|
||||
|
||||
@ -72,7 +72,7 @@ public abstract class TbAbstractVersionControlSettingsService<T extends Serializ
|
||||
}
|
||||
|
||||
public boolean delete(TenantId tenantId) {
|
||||
boolean result = adminSettingsService.deleteAdminSettings(tenantId, settingsKey);
|
||||
boolean result = adminSettingsService.deleteAdminSettingsByTenantIdAndKey(tenantId, settingsKey);
|
||||
cache.evict(tenantId);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -17,9 +17,11 @@ package org.thingsboard.server.service.sync.vc.data;
|
||||
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
@Getter
|
||||
public class PendingGitRequest<T> {
|
||||
@ -28,6 +30,8 @@ public class PendingGitRequest<T> {
|
||||
private final UUID requestId;
|
||||
private final TenantId tenantId;
|
||||
private final SettableFuture<T> future;
|
||||
@Setter
|
||||
private ScheduledFuture<?> timeoutTask;
|
||||
|
||||
public PendingGitRequest(TenantId tenantId) {
|
||||
this.createdTime = System.currentTimeMillis();
|
||||
|
||||
@ -1039,6 +1039,7 @@ queue:
|
||||
partitions: "${TB_QUEUE_VC_PARTITIONS:10}"
|
||||
poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}"
|
||||
pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}"
|
||||
request-timeout: "${TB_QUEUE_VC_REQUEST_TIMEOUT:60000}"
|
||||
js:
|
||||
# JS Eval request topic
|
||||
request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}"
|
||||
|
||||
@ -29,6 +29,8 @@ public interface AdminSettingsService {
|
||||
|
||||
AdminSettings saveAdminSettings(TenantId tenantId, AdminSettings adminSettings);
|
||||
|
||||
boolean deleteAdminSettings(TenantId tenantId, String key);
|
||||
boolean deleteAdminSettingsByTenantIdAndKey(TenantId tenantId, String key);
|
||||
|
||||
void deleteAdminSettingsByTenantId(TenantId tenantId);
|
||||
|
||||
}
|
||||
|
||||
@ -41,4 +41,6 @@ public interface AdminSettingsDao extends Dao<AdminSettings> {
|
||||
|
||||
boolean removeByTenantIdAndKey(UUID tenantId, String key);
|
||||
|
||||
void removeByTenantId(UUID tenantId);
|
||||
|
||||
}
|
||||
|
||||
@ -71,9 +71,15 @@ public class AdminSettingsServiceImpl implements AdminSettingsService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean deleteAdminSettings(TenantId tenantId, String key) {
|
||||
public boolean deleteAdminSettingsByTenantIdAndKey(TenantId tenantId, String key) {
|
||||
log.trace("Executing deleteAdminSettings, tenantId [{}], key [{}]", tenantId, key);
|
||||
Validator.validateString(key, "Incorrect key " + key);
|
||||
return adminSettingsDao.removeByTenantIdAndKey(tenantId.getId(), key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAdminSettingsByTenantId(TenantId tenantId) {
|
||||
adminSettingsDao.removeByTenantId(tenantId.getId());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -29,6 +29,8 @@ public interface AdminSettingsRepository extends JpaRepository<AdminSettingsEnti
|
||||
|
||||
void deleteByTenantIdAndKey(UUID tenantId, String key);
|
||||
|
||||
void deleteByTenantId(UUID tenantId);
|
||||
|
||||
boolean existsByTenantIdAndKey(UUID tenantId, String key);
|
||||
|
||||
}
|
||||
|
||||
@ -60,4 +60,10 @@ public class JpaAdminSettingsDao extends JpaAbstractDao<AdminSettingsEntity, Adm
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeByTenantId(UUID tenantId) {
|
||||
adminSettingsRepository.deleteByTenantId(tenantId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -41,6 +41,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.dao.service.DataValidator;
|
||||
import org.thingsboard.server.dao.service.PaginatedRemover;
|
||||
import org.thingsboard.server.dao.service.Validator;
|
||||
import org.thingsboard.server.dao.settings.AdminSettingsService;
|
||||
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
|
||||
import org.thingsboard.server.dao.user.UserService;
|
||||
import org.thingsboard.server.dao.widget.WidgetsBundleService;
|
||||
@ -107,6 +108,9 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe
|
||||
@Autowired
|
||||
private QueueService queueService;
|
||||
|
||||
@Autowired
|
||||
private AdminSettingsService adminSettingsService;
|
||||
|
||||
@Override
|
||||
public Tenant findTenantById(TenantId tenantId) {
|
||||
log.trace("Executing findTenantById [{}]", tenantId);
|
||||
@ -164,6 +168,7 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe
|
||||
otaPackageService.deleteOtaPackagesByTenantId(tenantId);
|
||||
rpcService.deleteAllRpcByTenantId(tenantId);
|
||||
queueService.deleteQueuesByTenantId(tenantId);
|
||||
adminSettingsService.deleteAdminSettingsByTenantId(tenantId);
|
||||
tenantDao.removeById(tenantId, tenantId.getId());
|
||||
deleteEntityRelations(tenantId, tenantId);
|
||||
}
|
||||
|
||||
@ -51,3 +51,17 @@ CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribu
|
||||
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_device_external_id ON device(tenant_id, external_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_device_profile_external_id ON device_profile(tenant_id, external_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_asset_external_id ON asset(tenant_id, external_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_rule_chain_external_id ON rule_chain(tenant_id, external_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_dashboard_external_id ON dashboard(tenant_id, external_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_customer_external_id ON customer(tenant_id, external_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_widgets_bundle_external_id ON widgets_bundle(tenant_id, external_id);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user