From 9bb74b96dca6921d3b29f4e44992f698c2d1ca40 Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Wed, 26 May 2021 15:20:15 +0300 Subject: [PATCH] Implement alarms removal by TTL --- .../ttl/alarms/AlarmsCleanUpService.java | 83 +++++++++++++++++++ .../src/main/resources/thingsboard.yml | 3 + .../server/common/data/TenantProfile.java | 7 ++ .../server/common/data/page/PageData.java | 7 +- .../DefaultTenantProfileConfiguration.java | 1 + .../java/org/thingsboard/server/dao/Dao.java | 3 + .../server/dao/alarm/AlarmDao.java | 4 + .../server/dao/sql/JpaAbstractDao.java | 7 ++ .../server/dao/sql/alarm/AlarmRepository.java | 5 ++ .../server/dao/sql/alarm/JpaAlarmDao.java | 6 ++ .../server/dao/sql/tenant/JpaTenantDao.java | 6 ++ .../dao/sql/tenant/TenantRepository.java | 4 + .../server/dao/tenant/TenantDao.java | 4 +- 13 files changed, 138 insertions(+), 2 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java new file mode 100644 index 0000000000..fe89af7faa --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java @@ -0,0 +1,83 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ttl.alarms; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.dao.alarm.AlarmDao; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.dao.tenant.TenantDao; +import org.thingsboard.server.dao.util.PsqlDao; +import org.thingsboard.server.queue.discovery.PartitionService; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +@PsqlDao +@Service +@Slf4j +@RequiredArgsConstructor +public class AlarmsCleanUpService { + @Value("${sql.ttl.alarms.removal_batch_size}") + private Integer removalBatchSize; + + private final AlarmDao alarmDao; + private final TenantDao tenantDao; + private final PartitionService partitionService; + private final TbTenantProfileCache tenantProfileCache; + + @Scheduled(initialDelayString = "${sql.ttl.alarms.checking_interval}", fixedDelayString = "${sql.ttl.alarms.checking_interval}") + public void cleanUp() { + if (!partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition()) { + return; + } + + PageLink tenantsBatchRequest = new PageLink(65536, 0); + PageLink alarmsRemovalBatchRequest = new PageLink(removalBatchSize, 0); + long currentTime = System.currentTimeMillis(); + + PageData tenantsIds; + do { + tenantsIds = tenantDao.findTenantsIds(tenantsBatchRequest); + tenantsIds.getData().forEach(tenantId -> { + Optional tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration(); + if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getAlarmsTtlDays() == 0) { + return; + } + + PageData toRemove; + long outdatageTime = currentTime - TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getAlarmsTtlDays()); + log.info("Cleaning up outdated alarms for tenant {}", tenantId); + do { + toRemove = alarmDao.findAlarmsIdsByEndTsBeforeAndTenantId(outdatageTime, tenantId, alarmsRemovalBatchRequest); + alarmDao.removeAllByIds(toRemove.getData()); + } while (toRemove.hasNext()); + }); + + tenantsBatchRequest = tenantsBatchRequest.nextPageLink(); + } while (tenantsIds.hasNext()); + } + +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 1cc89feab2..8edff5366e 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -273,6 +273,9 @@ sql: enabled: "${SQL_TTL_EDGE_EVENTS_ENABLED:true}" execution_interval_ms: "${SQL_TTL_EDGE_EVENTS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day edge_events_ttl: "${SQL_TTL_EDGE_EVENTS_TTL:2628000}" # Number of seconds. The current value corresponds to one month + alarms: + checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours + removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:200}" # To delete outdated alarms not all at once but in batches # Actor system parameters actors: diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/TenantProfile.java b/common/data/src/main/java/org/thingsboard/server/common/data/TenantProfile.java index a0fefea6cc..e3bb6c6a4e 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/TenantProfile.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/TenantProfile.java @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.validation.NoXss; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.Optional; import static org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo.mapper; @@ -92,6 +93,12 @@ public class TenantProfile extends SearchTextBased implements H } } + public Optional getProfileConfiguration() { + return Optional.ofNullable(getProfileData().getConfiguration()) + .filter(profileConfiguration -> profileConfiguration instanceof DefaultTenantProfileConfiguration) + .map(profileConfiguration -> (DefaultTenantProfileConfiguration) profileConfiguration); + } + public TenantProfileData createDefaultTenantProfileData() { TenantProfileData tpd = new TenantProfileData(); tpd.setConfiguration(new DefaultTenantProfileConfiguration()); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/page/PageData.java b/common/data/src/main/java/org/thingsboard/server/common/data/page/PageData.java index 2020245ea1..6ffbce4d3d 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/page/PageData.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/page/PageData.java @@ -17,10 +17,11 @@ package org.thingsboard.server.common.data.page; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.thingsboard.server.common.data.BaseData; import java.util.Collections; import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; public class PageData { @@ -61,4 +62,8 @@ public class PageData { return hasNext; } + public PageData mapData(Function mapper) { + return new PageData<>(getData().stream().map(mapper).collect(Collectors.toList()), getTotalPages(), getTotalElements(), hasNext()); + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java index b9bd72b0db..34f0b38ebf 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java @@ -53,6 +53,7 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura private long maxCreatedAlarms; private int defaultStorageTtlDays; + private int alarmsTtlDays; private double warnThreshold; diff --git a/dao/src/main/java/org/thingsboard/server/dao/Dao.java b/dao/src/main/java/org/thingsboard/server/dao/Dao.java index 0111abdbbe..a5d4dfd9d1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/Dao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/Dao.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.id.TenantId; +import java.util.Collection; import java.util.List; import java.util.UUID; @@ -33,4 +34,6 @@ public interface Dao { boolean removeById(TenantId tenantId, UUID id); + void removeAllByIds(Collection ids); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDao.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDao.java index eb873db679..bcef402ff9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDao.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.query.AlarmData; import org.thingsboard.server.common.data.query.AlarmDataQuery; import org.thingsboard.server.dao.Dao; @@ -54,4 +55,7 @@ public interface AlarmDao extends Dao { AlarmDataQuery query, Collection orderedEntityIds); Set findAlarmSeverities(TenantId tenantId, EntityId entityId, Set status); + + PageData findAlarmsIdsByEndTsBeforeAndTenantId(Long time, TenantId tenantId, PageLink pageLink); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java index 6bad1af0e5..18852cbf11 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java @@ -26,6 +26,7 @@ import org.thingsboard.server.dao.Dao; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.BaseEntity; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -87,6 +88,12 @@ public abstract class JpaAbstractDao, D> return !getCrudRepository().existsById(id); } + @Transactional + public void removeAllByIds(Collection ids) { + CrudRepository repository = getCrudRepository(); + ids.forEach(repository::deleteById); + } + @Override public List find(TenantId tenantId) { List entities = Lists.newArrayList(getCrudRepository().findAll()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/AlarmRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/AlarmRepository.java index b4c0ac09c6..b6eef91ac7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/AlarmRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/AlarmRepository.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.alarm; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; import org.springframework.data.repository.query.Param; @@ -159,4 +160,8 @@ public interface AlarmRepository extends CrudRepository { @Param("affectedEntityId") UUID affectedEntityId, @Param("affectedEntityType") String affectedEntityType, @Param("alarmStatuses") Set alarmStatuses); + + @Query("SELECT a.id FROM AlarmEntity a WHERE a.createdTime < :time AND a.endTs < :time") + Page findAlarmsIdsByEndTsBefore(@Param("time") Long time, Pageable pageable); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java index cf222413b0..f7da17d6ed 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java @@ -30,6 +30,7 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.query.AlarmData; import org.thingsboard.server.common.data.query.AlarmDataQuery; import org.thingsboard.server.dao.DaoUtil; @@ -161,4 +162,9 @@ public class JpaAlarmDao extends JpaAbstractDao implements A public Set findAlarmSeverities(TenantId tenantId, EntityId entityId, Set statuses) { return alarmRepository.findAlarmSeverities(tenantId.getId(), entityId.getId(), entityId.getEntityType().name(), statuses); } + + @Override + public PageData findAlarmsIdsByEndTsBeforeAndTenantId(Long time, TenantId tenantId, PageLink pageLink) { + return DaoUtil.pageToPageData(alarmRepository.findAlarmsIdsByEndTsBefore(time, DaoUtil.toPageable(pageLink))); + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/tenant/JpaTenantDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/tenant/JpaTenantDao.java index d2a67d50ce..ff9dce6c96 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/tenant/JpaTenantDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/tenant/JpaTenantDao.java @@ -74,4 +74,10 @@ public class JpaTenantDao extends JpaAbstractSearchTextDao Objects.toString(pageLink.getTextSearch(), ""), DaoUtil.toPageable(pageLink, TenantInfoEntity.tenantInfoColumnMap))); } + + @Override + public PageData findTenantsIds(PageLink pageLink) { + return DaoUtil.pageToPageData(tenantRepository.findTenantsIds(DaoUtil.toPageable(pageLink))).mapData(TenantId::new); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/tenant/TenantRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/tenant/TenantRepository.java index b43d70197c..8ab12e0bb5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/tenant/TenantRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/tenant/TenantRepository.java @@ -50,4 +50,8 @@ public interface TenantRepository extends PagingAndSortingRepository findTenantInfoByRegionNextPage(@Param("region") String region, @Param("textSearch") String textSearch, Pageable pageable); + + @Query("SELECT t.id FROM TenantEntity t") + Page findTenantsIds(Pageable pageable); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantDao.java b/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantDao.java index bff1c2c8a9..5bceb35376 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantDao.java @@ -46,5 +46,7 @@ public interface TenantDao extends Dao { PageData findTenantsByRegion(TenantId tenantId, String region, PageLink pageLink); PageData findTenantInfosByRegion(TenantId tenantId, String region, PageLink pageLink); - + + PageData findTenantsIds(PageLink pageLink); + }