Delete tenant entities in batches with id offset

This commit is contained in:
ViacheslavKlimov 2024-04-11 17:18:47 +03:00
parent feb629f2c2
commit 9064bfaa76
15 changed files with 184 additions and 84 deletions

View File

@ -25,9 +25,9 @@ import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.alarm.AlarmService;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
@ -46,13 +46,18 @@ public class AlarmsDeletionTaskProcessor extends HousekeeperTaskProcessor<Alarms
if (entityType == EntityType.DEVICE || entityType == EntityType.ASSET) {
if (task.getAlarms() == null) {
DaoUtil.iterateWithKeyOffset(pageLink -> {
return alarmService.findAlarmIdsByOriginatorId(tenantId, entityId, pageLink);
}, 128, alarms -> {
AlarmId last = null;
while (true) {
List<AlarmId> alarms = alarmService.findAlarmIdsByOriginatorIdAndIdOffset(tenantId, entityId, last, 128);
if (alarms.isEmpty()) {
break;
}
housekeeperClient.submitTask(new AlarmsDeletionHousekeeperTask(tenantId, entityId, alarms.stream()
.map(UUIDBased::getId).collect(Collectors.toList())));
last = alarms.get(alarms.size() - 1);
log.debug("[{}][{}][{}] Submitted task for deleting {} alarms", tenantId, entityType, entityId, alarms.size());
});
}
} else {
for (UUID alarmId : task.getAlarms()) {
alarmService.delAlarm(tenantId, new AlarmId(alarmId));

View File

@ -16,22 +16,59 @@
package org.thingsboard.server.service.housekeeper.processor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.housekeeper.EntitiesDeletionHousekeeperTask;
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.Dao;
import org.thingsboard.server.dao.entity.EntityDaoRegistry;
import org.thingsboard.server.dao.entity.EntityDaoService;
import org.thingsboard.server.dao.entity.EntityServiceRegistry;
import java.util.List;
import java.util.UUID;
@Component
@RequiredArgsConstructor
@Slf4j
public class EntitiesDeletionTaskProcessor extends HousekeeperTaskProcessor<EntitiesDeletionHousekeeperTask> {
private final EntityDaoRegistry entityDaoRegistry;
private final EntityServiceRegistry entityServiceRegistry;
@Override
public void process(EntitiesDeletionHousekeeperTask task) throws Exception {
EntityDaoService entityService = entityServiceRegistry.getServiceByEntityType(task.getEntityType());
entityService.deleteByTenantId(task.getTenantId());
EntityType entityType = task.getEntityType();
TenantId tenantId = task.getTenantId();
Dao<?> entityDao = entityDaoRegistry.getDao(entityType);
EntityDaoService entityService = entityServiceRegistry.getServiceByEntityType(entityType);
if (task.getEntities() == null) {
UUID last = null;
while (true) {
List<UUID> entities = entityDao.findIdsByTenantIdAndIdOffset(tenantId, last, 128);
if (entities.isEmpty()) {
break;
}
housekeeperClient.submitTask(new EntitiesDeletionHousekeeperTask(tenantId, entityType, task.getEntities()));
last = entities.get(entities.size() - 1);
log.debug("[{}] Submitted task for deleting {} {}s", tenantId, entities.size(), entityType.getNormalName().toLowerCase());
}
} else {
for (UUID entityUuid : task.getEntities()) {
EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, entityUuid);
// Optional<HasId<?>> entity = entityService.findEntity(tenantId, entityId);
// if (entity.isEmpty()) {
// continue;
// }
entityService.deleteEntity(tenantId, entityId);
}
}
}
@Override

View File

@ -368,7 +368,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
}
private void verifyNoAlarms(EntityId entityId) {
assertThat(alarmService.findAlarmIdsByOriginatorId(tenantId, entityId, new PageLink(1)).getData()).isEmpty();
assertThat(alarmService.findAlarmIdsByOriginatorIdAndIdOffset(tenantId, entityId, null, 10)).isEmpty();
}
private void createAttribute(EntityId entityId, AttributeScope scope, String key) throws Exception {
@ -410,7 +410,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
List<EntityAlarm> entityAlarms = alarmDao.findEntityAlarmRecords(tenantId, alarm.getId());
assertThat(entityAlarms).anyMatch(entityAlarm -> entityAlarm.getEntityId().equals(deviceId) && entityAlarm.getAlarmType().equals(alarm.getType()));
assertThat(entityAlarms).anyMatch(entityAlarm -> entityAlarm.getEntityId().equals(propagatedEntityId) && entityAlarm.getAlarmType().equals(alarm.getType()));
assertThat(alarmService.findAlarmIdsByOriginatorId(tenantId, deviceId, new PageLink(1)).getData()).isNotEmpty();
assertThat(alarmService.findAlarmIdsByOriginatorIdAndIdOffset(tenantId, deviceId, null, 10)).isNotEmpty();
}
private void createAlarm(DeviceId deviceId) {
@ -420,7 +420,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
.severity(AlarmSeverity.CRITICAL)
.type("test alarm for " + deviceId + " " + RandomStringUtils.randomAlphabetic(10))
.build(), Alarm.class);
assertThat(alarmService.findAlarmIdsByOriginatorId(tenantId, deviceId, new PageLink(1)).getData()).isNotEmpty();
assertThat(alarmService.findAlarmIdsByOriginatorIdAndIdOffset(tenantId, deviceId, null, 10)).isNotEmpty();
}
private TsKvEntry getLatestTelemetry(EntityId entityId) throws Exception {

View File

@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.dao.entity.EntityDaoService;
import java.util.Collection;
import java.util.List;
import java.util.Set;
@ -106,7 +107,7 @@ public interface AlarmService extends EntityDaoService {
PageData<AlarmId> findAlarmIdsByAssigneeId(TenantId tenantId, UserId userId, PageLink pageLink);
PageData<AlarmId> findAlarmIdsByOriginatorId(TenantId tenantId, EntityId originatorId, PageLink pageLink);
List<AlarmId> findAlarmIdsByOriginatorIdAndIdOffset(TenantId tenantId, EntityId originatorId, AlarmId idOffset, int limit);
int deleteEntityAlarmRecords(TenantId tenantId, EntityId entityId);

View File

@ -20,7 +20,6 @@ import org.apache.commons.lang3.StringUtils;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author Andrew Shvayka
@ -28,51 +27,57 @@ import java.util.stream.Collectors;
public enum EntityType {
TENANT(1),
CUSTOMER(2),
USER(3),
USER(3, "tb_user"),
DASHBOARD(4),
ASSET(5),
DEVICE(6),
ALARM (7),
RULE_CHAIN (11),
RULE_NODE (12),
ENTITY_VIEW (15) {
ALARM(7),
RULE_CHAIN(11),
RULE_NODE(12),
ENTITY_VIEW(15) {
// backward compatibility for TbOriginatorTypeSwitchNode to return correct rule node connection.
@Override
public String getNormalName () {
public String getNormalName() {
return "Entity View";
}
},
WIDGETS_BUNDLE (16),
WIDGET_TYPE (17),
TENANT_PROFILE (20),
DEVICE_PROFILE (21),
ASSET_PROFILE (22),
API_USAGE_STATE (23),
TB_RESOURCE (24),
OTA_PACKAGE (25),
EDGE (26),
RPC (27),
QUEUE (28),
NOTIFICATION_TARGET (29),
NOTIFICATION_TEMPLATE (30),
NOTIFICATION_REQUEST (31),
NOTIFICATION (32),
NOTIFICATION_RULE (33),
WIDGETS_BUNDLE(16),
WIDGET_TYPE(17),
TENANT_PROFILE(20),
DEVICE_PROFILE(21),
ASSET_PROFILE(22),
API_USAGE_STATE(23),
TB_RESOURCE(24, "resource"),
OTA_PACKAGE(25),
EDGE(26),
RPC(27),
QUEUE(28),
NOTIFICATION_TARGET(29),
NOTIFICATION_TEMPLATE(30),
NOTIFICATION_REQUEST(31),
NOTIFICATION(32),
NOTIFICATION_RULE(33),
QUEUE_STATS(34);
@Getter
private final int protoNumber; // Corresponds to EntityTypeProto
private EntityType(int protoNumber) {
this.protoNumber = protoNumber;
}
public static final List<String> NORMAL_NAMES = EnumSet.allOf(EntityType.class).stream()
.map(EntityType::getNormalName).collect(Collectors.toUnmodifiableList());
@Getter
private final String tableName;
@Getter
private final String normalName = StringUtils.capitalize(StringUtils.removeStart(name(), "TB_")
.toLowerCase().replaceAll("_", " "));
public static final List<String> NORMAL_NAMES = EnumSet.allOf(EntityType.class).stream()
.map(EntityType::getNormalName).toList();
EntityType(int protoNumber) {
this.protoNumber = protoNumber;
this.tableName = name().toLowerCase();
}
EntityType(int protoNumber, String tableName) {
this.protoNumber = protoNumber;
this.tableName = tableName;
}
}

View File

@ -34,13 +34,13 @@ public class AlarmsDeletionHousekeeperTask extends HousekeeperTask {
private List<UUID> alarms;
public AlarmsDeletionHousekeeperTask(TenantId tenantId, EntityId entityId) {
this(tenantId, entityId, null);
}
public AlarmsDeletionHousekeeperTask(TenantId tenantId, EntityId entityId, List<UUID> alarms) {
super(tenantId, entityId, HousekeeperTaskType.DELETE_ALARMS);
this.alarms = alarms;
}
public AlarmsDeletionHousekeeperTask(TenantId tenantId, EntityId entityId) {
super(tenantId, entityId, HousekeeperTaskType.DELETE_ALARMS);
}
}

View File

@ -24,6 +24,9 @@ import lombok.ToString;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.List;
import java.util.UUID;
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ -31,10 +34,16 @@ import org.thingsboard.server.common.data.id.TenantId;
public class EntitiesDeletionHousekeeperTask extends HousekeeperTask {
private EntityType entityType;
private List<UUID> entities;
protected EntitiesDeletionHousekeeperTask(TenantId tenantId, EntityType entityType) {
this(tenantId, entityType, null);
}
public EntitiesDeletionHousekeeperTask(TenantId tenantId, EntityType entityType, List<UUID> entities) {
super(tenantId, tenantId, HousekeeperTaskType.DELETE_ENTITIES);
this.entityType = entityType;
this.entities = entities;
}
@JsonIgnore

View File

@ -16,20 +16,15 @@
package org.thingsboard.server.common.data.page;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import org.springframework.data.domain.Sort;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@Data
@AllArgsConstructor
@Builder
public class PageLink {
protected static final String DEFAULT_SORT_PROPERTY = "id";
@ -39,7 +34,6 @@ public class PageLink {
private final int pageSize;
private final int page;
private final SortOrder sortOrder;
private UUID idOffset;
public PageLink(PageLink pageLink) {
this.pageSize = pageLink.getPageSize();
@ -69,10 +63,10 @@ public class PageLink {
@JsonIgnore
public PageLink nextPageLink() {
return new PageLink(this.pageSize, this.page+1, this.textSearch, this.sortOrder);
return new PageLink(this.pageSize, this.page + 1, this.textSearch, this.sortOrder);
}
public Sort toSort(SortOrder sortOrder, Map<String,String> columnMap, boolean addDefaultSorting) {
public Sort toSort(SortOrder sortOrder, Map<String, String> columnMap, boolean addDefaultSorting) {
if (sortOrder == null) {
return DEFAULT_SORT;
} else {
@ -80,7 +74,7 @@ public class PageLink {
}
}
public Sort toSort(List<SortOrder> sortOrders, Map<String,String> columnMap, boolean addDefaultSorting) {
public Sort toSort(List<SortOrder> sortOrders, Map<String, String> columnMap, boolean addDefaultSorting) {
if (addDefaultSorting && !isDefaultSortOrderAvailable(sortOrders)) {
sortOrders = new ArrayList<>(sortOrders);
sortOrders.add(new SortOrder(DEFAULT_SORT_PROPERTY, SortOrder.Direction.ASC));
@ -88,7 +82,7 @@ public class PageLink {
return Sort.by(sortOrders.stream().map(s -> toSortOrder(s, columnMap)).collect(Collectors.toList()));
}
private Sort.Order toSortOrder(SortOrder sortOrder, Map<String,String> columnMap) {
private Sort.Order toSortOrder(SortOrder sortOrder, Map<String, String> columnMap) {
String property = sortOrder.getProperty();
if (columnMap.containsKey(property)) {
property = columnMap.get(property);

View File

@ -43,6 +43,8 @@ public interface Dao<T> {
void removeAllByIds(Collection<UUID> ids);
List<UUID> findIdsByTenantIdAndIdOffset(TenantId tenantId, UUID idOffset, int limit);
default EntityType getEntityType() { return null; }
}

View File

@ -23,7 +23,6 @@ import org.springframework.util.CollectionUtils;
import org.thingsboard.server.common.data.EntityInfo;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.page.PageData;
@ -169,26 +168,6 @@ public abstract class DaoUtil {
} while (hasNextBatch);
}
public static <T extends EntityId> void iterateWithKeyOffset(Function<PageLink, PageData<T>> findFunction, int pageSize, Consumer<List<T>> processor) {
List<T> data;
UUID last = null;
while (true) {
PageLink pageLink = PageLink.builder()
.pageSize(pageSize)
.sortOrder(SortOrder.of("id", SortOrder.Direction.ASC))
.idOffset(last)
.build();
data = findFunction.apply(pageLink).getData();
if (!data.isEmpty()) {
processor.accept(data);
last = data.get(data.size() - 1).getId();
} else {
break;
}
}
}
public static String getStringId(UUIDBased id) {
if (id != null) {
return id.toString();

View File

@ -80,7 +80,7 @@ public interface AlarmDao extends Dao<Alarm> {
PageData<AlarmId> findAlarmIdsByAssigneeId(TenantId tenantId, UUID userId, PageLink pageLink);
PageData<AlarmId> findAlarmIdsByOriginatorId(TenantId tenantId, EntityId originatorId, PageLink pageLink);
PageData<AlarmId> findAlarmIdsByOriginatorId(TenantId tenantId, EntityId originatorId, AlarmId idOffset, PageLink pageLink);
void createEntityAlarmRecord(EntityAlarm entityAlarm);

View File

@ -313,9 +313,9 @@ public class BaseAlarmService extends AbstractCachedEntityService<TenantId, Page
}
@Override
public PageData<AlarmId> findAlarmIdsByOriginatorId(TenantId tenantId, EntityId originatorId, PageLink pageLink) {
log.trace("[{}] Executing findAlarmsByOriginatorId [{}]", tenantId, originatorId);
return alarmDao.findAlarmIdsByOriginatorId(tenantId, originatorId, pageLink);
public List<AlarmId> findAlarmIdsByOriginatorIdAndIdOffset(TenantId tenantId, EntityId originatorId, AlarmId idOffset, int limit) {
log.trace("[{}] Executing findAlarmIdsByOriginatorIdAndIdOffset [{}][{}]", tenantId, originatorId, idOffset);
return alarmDao.findAlarmIdsByOriginatorId(tenantId, originatorId, idOffset, new PageLink(limit)).getData();
}
@Override

View File

@ -0,0 +1,49 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.entity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.dao.Dao;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
@Slf4j
public class EntityDaoRegistry {
private final Map<EntityType, Dao<?>> daos;
private EntityDaoRegistry(List<Dao<?>> daos) {
this.daos = daos.stream().filter(dao -> dao.getEntityType() != null)
.collect(Collectors.toMap(Dao::getEntityType, dao -> dao));
}
@SuppressWarnings("unchecked")
public <T> Dao<T> getDao(EntityType entityType) {
Dao<T> dao = (Dao<T>) daos.get(entityType);
if (dao == null) {
throw new IllegalArgumentException("Missing dao for entity type " + entityType);
}
return dao;
}
}

View File

@ -19,7 +19,9 @@ import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.Dao;
@ -41,6 +43,9 @@ public abstract class JpaAbstractDao<E extends BaseEntity<D>, D>
extends JpaAbstractDaoListeningExecutorService
implements Dao<D> {
@Autowired
protected JdbcTemplate jdbcTemplate;
protected abstract Class<E> getEntityClass();
protected abstract JpaRepository<E, UUID> getRepository();
@ -117,6 +122,20 @@ public abstract class JpaAbstractDao<E extends BaseEntity<D>, D>
ids.forEach(repository::deleteById);
}
@Override
public List<UUID> findIdsByTenantIdAndIdOffset(TenantId tenantId, UUID idOffset, int limit) {
String tableName = Optional.ofNullable(getEntityType())
.orElseThrow(() -> new IllegalArgumentException("Entity type not specified"))
.getTableName();
if (idOffset == null) {
return jdbcTemplate.queryForList("SELECT id FROM " + tableName + " WHERE tenant_id = ? ORDER BY id LIMIT ?",
UUID.class, tenantId.getId(), limit);
} else {
return jdbcTemplate.queryForList("SELECT id FROM " + tableName + " WHERE tenant_id = ? AND id > ? ORDER BY id LIMIT ?",
UUID.class, tenantId.getId(), idOffset, limit);
}
}
@Override
public List<D> find(TenantId tenantId) {
List<E> entities = Lists.newArrayList(getRepository().findAll());

View File

@ -297,8 +297,8 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
}
@Override
public PageData<AlarmId> findAlarmIdsByOriginatorId(TenantId tenantId, EntityId originatorId, PageLink pageLink) {
return DaoUtil.pageToPageData(alarmRepository.findAlarmIdsByOriginatorId(tenantId.getId(), originatorId.getId(), pageLink.getIdOffset(), DaoUtil.toPageable(pageLink)))
public PageData<AlarmId> findAlarmIdsByOriginatorId(TenantId tenantId, EntityId originatorId, AlarmId idOffset, PageLink pageLink) {
return DaoUtil.pageToPageData(alarmRepository.findAlarmIdsByOriginatorId(tenantId.getId(), originatorId.getId(), idOffset != null ? idOffset.getId() : null, DaoUtil.toPageable(pageLink)))
.mapData(AlarmId::new);
}