From d1292a0d04596c3453514b2d973b38a469221ecc Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Mon, 7 Jun 2021 13:23:01 +0300 Subject: [PATCH] refactoring ttl cleanup services --- .../schema_update_psql_drop_partitions.sql | 174 +++++++++++++----- .../service/ttl/EdgeEventsCleanUpService.java | 4 +- .../service/ttl/EventsCleanUpService.java | 4 +- .../service/ttl/TimeseriesCleanUpService.java | 4 + .../server/dao/edge/EdgeService.java | 2 +- .../server/dao/event/EventService.java | 2 +- .../thingsboard/server/dao/edge/EdgeDao.java | 3 +- .../server/dao/edge/EdgeServiceImpl.java | 4 +- .../server/dao/event/BaseEventService.java | 4 +- .../server/dao/event/EventDao.java | 3 +- .../server/dao/sql/edge/JpaEdgeDao.java | 20 +- .../server/dao/sql/event/JpaBaseEventDao.java | 23 ++- .../dao/sqlts/AbstractSqlTimeseriesDao.java | 16 +- .../dao/sqlts/psql/JpaPsqlTimeseriesDao.java | 17 +- dao/src/main/resources/sql/schema-ts-psql.sql | 174 +++++++++++++----- 15 files changed, 312 insertions(+), 142 deletions(-) diff --git a/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql b/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql index fcc5c6f232..9d336e0330 100644 --- a/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql +++ b/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql @@ -18,17 +18,18 @@ CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar LANGUAGE plpgsql AS $$ DECLARE - max_tenant_ttl bigint; - max_customer_ttl bigint; - max_ttl bigint; - date timestamp; - partition_by_max_ttl_date varchar; - partition_month varchar; - partition_day varchar; - partition_year varchar; - partition varchar; - partition_to_delete varchar; - + max_tenant_ttl bigint; + max_customer_ttl bigint; + max_ttl bigint; + date timestamp; + partition_by_max_ttl_date varchar; + partition_by_max_ttl_month varchar; + partition_by_max_ttl_day varchar; + partition_by_max_ttl_year varchar; + partition varchar; + partition_year integer; + partition_month integer; + partition_day integer; BEGIN SELECT max(attribute_kv.long_v) @@ -45,53 +46,138 @@ BEGIN if max_ttl IS NOT NULL AND max_ttl > 0 THEN date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl); partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date); + RAISE NOTICE 'Date by max ttl: %', date; RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date; IF partition_by_max_ttl_date IS NOT NULL THEN CASE WHEN partition_type = 'DAYS' THEN - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); - partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5); + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); + partition_by_max_ttl_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5); WHEN partition_type = 'MONTHS' THEN - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); ELSE - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); END CASE; - FOR partition IN SELECT tablename - FROM pg_tables - WHERE schemaname = 'public' - AND tablename like 'ts_kv_' || '%' - AND tablename != 'ts_kv_latest' - AND tablename != 'ts_kv_dictionary' - AND tablename != 'ts_kv_indefinite' - LOOP - IF partition != partition_by_max_ttl_date THEN - IF partition_year IS NOT NULL THEN - IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN - partition_to_delete := partition; - ELSE - IF partition_month IS NOT NULL THEN - IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN - partition_to_delete := partition; + IF partition_by_max_ttl_year IS NULL THEN + RAISE NOTICE 'Failed to remove partitions by max ttl date due to partition_by_max_ttl_year is null!'; + ELSE + IF partition_type = 'YEARS' THEN + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' + AND tablename != partition_by_max_ttl_date + LOOP + partition_year := SPLIT_PART(partition, '_', 3)::integer; + IF partition_year < partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + END IF; + END LOOP; + ELSE + IF partition_type = 'MONTHS' THEN + IF partition_by_max_ttl_month IS NULL THEN + RAISE NOTICE 'Failed to remove months partitions by max ttl date due to partition_by_max_ttl_month is null!'; + ELSE + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' + AND tablename != partition_by_max_ttl_date + LOOP + partition_year := SPLIT_PART(partition, '_', 3)::integer; + IF partition_year > partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; ELSE - IF partition_day IS NOT NULL THEN - IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN - partition_to_delete := partition; + IF partition_year < partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + ELSE + partition_month := SPLIT_PART(partition, '_', 4)::integer; + IF partition_year = partition_by_max_ttl_year::integer THEN + IF partition_month >= partition_by_max_ttl_month::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + END IF; END IF; END IF; END IF; + END LOOP; + END IF; + ELSE + IF partition_type = 'DAYS' THEN + IF partition_by_max_ttl_month IS NULL THEN + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_month is null!'; + ELSE + IF partition_by_max_ttl_day IS NULL THEN + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_day is null!'; + ELSE + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' + AND tablename != partition_by_max_ttl_date + LOOP + partition_year := SPLIT_PART(partition, '_', 3)::integer; + IF partition_year > partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + IF partition_year < partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + ELSE + partition_month := SPLIT_PART(partition, '_', 4)::integer; + IF partition_month > partition_by_max_ttl_month::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + IF partition_month < partition_by_max_ttl_month::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + ELSE + partition_day := SPLIT_PART(partition, '_', 5)::integer; + IF partition_day >= partition_by_max_ttl_day::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + IF partition_day < partition_by_max_ttl_day::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + END IF; + END IF; + END IF; + END IF; + END IF; + END IF; + END LOOP; END IF; END IF; END IF; - IF partition_to_delete IS NOT NULL THEN - RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete; - EXECUTE format('DROP TABLE IF EXISTS %I', partition_to_delete); - partition_to_delete := NULL; - deleted := deleted + 1; - END IF; END IF; - END LOOP; + END IF; + END IF; END IF; END IF; END @@ -107,8 +193,6 @@ BEGIN partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM'); WHEN partition_type = 'YEARS' THEN partition := 'ts_kv_' || to_char(date, 'yyyy'); - WHEN partition_type = 'INDEFINITE' THEN - partition := NULL; ELSE partition := NULL; END CASE; diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java index 7aa8881bed..3cdd1f71bd 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java @@ -45,9 +45,7 @@ public class EdgeEventsCleanUpService extends AbstractCleanUpService { @Scheduled(initialDelayString = "${sql.ttl.edge_events.execution_interval_ms}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}") public void cleanUp() { if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) { - log.info("Going to cleanup old edge events using ttl: {}s", ttl); - long totalEdgeEventsRemoved = edgeService.cleanupEvents(ttl); - log.info("Total edge events removed by TTL: [{}]", totalEdgeEventsRemoved); + edgeService.cleanupEvents(ttl); } } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/EventsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/EventsCleanUpService.java index b244e1b8fd..a51910b7ed 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/EventsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/EventsCleanUpService.java @@ -48,9 +48,7 @@ public class EventsCleanUpService extends AbstractCleanUpService { @Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}") public void cleanUp() { if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) { - log.info("Going to cleanup old events using debug events ttl: {}s and other events ttl: {}s", debugTtl, ttl); - long totalEventsRemoved = eventService.cleanupEvents(ttl, debugTtl); - log.info("Total events removed by TTL: [{}]", totalEventsRemoved); + eventService.cleanupEvents(ttl, debugTtl); } } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/TimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/TimeseriesCleanUpService.java index 7094b34e62..55c746b580 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/TimeseriesCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/TimeseriesCleanUpService.java @@ -18,11 +18,15 @@ package org.thingsboard.server.service.ttl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.ttl.AbstractCleanUpService; +@TbCoreComponent @Slf4j +@Service public class TimeseriesCleanUpService extends AbstractCleanUpService { @Value("${sql.ttl.ts.ts_key_value_ttl}") diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java index 8567077eb1..53472fd261 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java @@ -94,5 +94,5 @@ public interface EdgeService { String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId); - long cleanupEvents(long ttl); + void cleanupEvents(long ttl); } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java index dce49afdb1..db1c77697e 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java @@ -46,6 +46,6 @@ public interface EventService { void removeEvents(TenantId tenantId, EntityId entityId); - long cleanupEvents(long ttl, long debugTtl); + void cleanupEvents(long ttl, long debugTtl); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java index eda1075cd6..60ef98a801 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java @@ -180,7 +180,6 @@ public interface EdgeDao extends Dao { /** * Executes stored procedure to cleanup old edge events. * @param ttl the ttl for edge events in seconds - * @return the number of deleted edge events */ - long cleanupEvents(long ttl); + void cleanupEvents(long ttl); } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index d06ce8d722..0089853969 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -628,8 +628,8 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic } @Override - public long cleanupEvents(long ttl) { - return edgeDao.cleanupEvents(ttl); + public void cleanupEvents(long ttl) { + edgeDao.cleanupEvents(ttl); } private List findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java index 4cfd0a3ffa..2785df90df 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java @@ -132,8 +132,8 @@ public class BaseEventService implements EventService { } @Override - public long cleanupEvents(long ttl, long debugTtl) { - return eventDao.cleanupEvents(ttl, debugTtl); + public void cleanupEvents(long ttl, long debugTtl) { + eventDao.cleanupEvents(ttl, debugTtl); } private DataValidator eventValidator = diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java b/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java index d943937ca3..ceacbffd50 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java @@ -106,7 +106,6 @@ public interface EventDao extends Dao { * Executes stored procedure to cleanup old events. Uses separate ttl for debug and other events. * @param otherEventsTtl the ttl for events in seconds * @param debugEventsTtl the ttl for debug events in seconds - * @return the number of deleted events */ - long cleanupEvents(long otherEventsTtl, long debugEventsTtl); + void cleanupEvents(long otherEventsTtl, long debugEventsTtl); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java index 8e7112e81d..ec6d730230 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java @@ -40,9 +40,9 @@ import org.thingsboard.server.dao.model.sql.EdgeInfoEntity; import org.thingsboard.server.dao.relation.RelationDao; import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; -import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Types; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -198,17 +198,19 @@ public class JpaEdgeDao extends JpaAbstractSearchTextDao imple } @Override - public long cleanupEvents(long ttl) { + public void cleanupEvents(long ttl) { try { - CallableStatement stmt = dataSource.getConnection().prepareCall("{call cleanup_edge_events_by_ttl(?,?,?)}"); + log.info("Going to cleanup old edge events using ttl: {}s", ttl); + PreparedStatement stmt = dataSource.getConnection().prepareStatement("call cleanup_edge_events_by_ttl(?,?)"); stmt.setLong(1, ttl); - stmt.registerOutParameter(3, Types.BIGINT); - stmt.executeUpdate(); + stmt.setLong(2, 0); + stmt.execute(); printWarnings(stmt); - return stmt.getLong(3); + ResultSet resultSet = stmt.getResultSet(); + resultSet.next(); + log.info("Total edge events removed by TTL: [{}]", resultSet.getLong(1)); } catch (SQLException e) { - log.error("SQLException occurred during TTL task execution ", e); - return 0; + log.error("SQLException occurred during edge events TTL task execution ", e); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java index edaf95a29d..cde7d75d32 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java @@ -39,12 +39,9 @@ import org.thingsboard.server.dao.event.EventDao; import org.thingsboard.server.dao.model.sql.EventEntity; import org.thingsboard.server.dao.sql.JpaAbstractDao; -import javax.sql.DataSource; -import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.SQLWarning; -import java.sql.Statement; -import java.sql.Types; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -262,18 +259,20 @@ public class JpaBaseEventDao extends JpaAbstractDao implemen } @Override - public long cleanupEvents(long otherEventsTtl, long debugEventsTtl) { + public void cleanupEvents(long otherEventsTtl, long debugEventsTtl) { try { - CallableStatement stmt = dataSource.getConnection().prepareCall("{call cleanup_events_by_ttl(?,?,?)}"); + log.info("Going to cleanup old events using debug events ttl: {}s and other events ttl: {}s", debugEventsTtl, otherEventsTtl); + PreparedStatement stmt = dataSource.getConnection().prepareStatement("call cleanup_events_by_ttl(?,?,?)"); stmt.setLong(1, otherEventsTtl); stmt.setLong(2, debugEventsTtl); - stmt.registerOutParameter(3, Types.BIGINT); - stmt.executeUpdate(); + stmt.setLong(3, 0); + stmt.execute(); printWarnings(stmt); - return stmt.getLong(3); + ResultSet resultSet = stmt.getResultSet(); + resultSet.next(); + log.info("Total events removed by TTL: [{}]", resultSet.getLong(1)); } catch (SQLException e) { - log.error("SQLException occurred during TTL task execution ", e); - return 0; + log.error("SQLException occurred during events TTL task execution ", e); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java index abd8f15e77..f1636740c3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java @@ -29,9 +29,9 @@ import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; import javax.annotation.Nullable; -import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Types; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -69,15 +69,17 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries public void cleanup(long systemTtl) { try { log.info("Going to cleanup old timeseries data using ttl: {}s", systemTtl); - CallableStatement stmt = dataSource.getConnection().prepareCall("{call cleanup_timeseries_by_ttl(?,?,?)}"); + PreparedStatement stmt = dataSource.getConnection().prepareStatement("call cleanup_timeseries_by_ttl(?,?,?)"); stmt.setObject(1, ModelConstants.NULL_UUID); stmt.setLong(2, systemTtl); - stmt.registerOutParameter(3, Types.BIGINT); - stmt.executeUpdate(); + stmt.setLong(3, 0); + stmt.execute(); printWarnings(stmt); - log.info("Total telemetry removed stats by TTL for entities: [{}]", stmt.getLong(3)); + ResultSet resultSet = stmt.getResultSet(); + resultSet.next(); + log.info("Total telemetry removed stats by TTL for entities: [{}]", resultSet.getLong(1)); } catch (SQLException e) { - log.error("SQLException occurred during TTL task execution ", e); + log.error("SQLException occurred during timeseries TTL task execution ", e); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java index 594f77ae78..9b18dcde7e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java @@ -27,7 +27,6 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao; import org.thingsboard.server.dao.sqlts.insert.psql.PsqlPartitioningRepository; @@ -36,9 +35,9 @@ import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate; import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; -import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Types; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; @@ -107,13 +106,15 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa private void cleanupPartitions(long systemTtl) { try { log.info("Going to cleanup old timeseries data partitions using partition type: {} and ttl: {}s", partitioning, systemTtl); - CallableStatement stmt = dataSource.getConnection().prepareCall("{call drop_partitions_by_max_ttl(?,?,?)}"); - stmt.setObject(1, partitioning); + PreparedStatement stmt = dataSource.getConnection().prepareStatement("call drop_partitions_by_max_ttl(?,?,?)"); + stmt.setString(1, partitioning); stmt.setLong(2, systemTtl); - stmt.registerOutParameter(3, Types.BIGINT); - stmt.executeUpdate(); + stmt.setLong(3, 0); + stmt.execute(); printWarnings(stmt); - log.info("Total partitions removed by TTL: [{}]", stmt.getLong(3)); + ResultSet resultSet = stmt.getResultSet(); + resultSet.next(); + log.info("Total partitions removed by TTL: [{}]", resultSet.getLong(1)); } catch (SQLException e) { log.error("SQLException occurred during TTL task execution ", e); } diff --git a/dao/src/main/resources/sql/schema-ts-psql.sql b/dao/src/main/resources/sql/schema-ts-psql.sql index 5683cc0a17..2744ff5a07 100644 --- a/dao/src/main/resources/sql/schema-ts-psql.sql +++ b/dao/src/main/resources/sql/schema-ts-psql.sql @@ -38,17 +38,18 @@ CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar LANGUAGE plpgsql AS $$ DECLARE - max_tenant_ttl bigint; - max_customer_ttl bigint; - max_ttl bigint; - date timestamp; - partition_by_max_ttl_date varchar; - partition_month varchar; - partition_day varchar; - partition_year varchar; - partition varchar; - partition_to_delete varchar; - + max_tenant_ttl bigint; + max_customer_ttl bigint; + max_ttl bigint; + date timestamp; + partition_by_max_ttl_date varchar; + partition_by_max_ttl_month varchar; + partition_by_max_ttl_day varchar; + partition_by_max_ttl_year varchar; + partition varchar; + partition_year integer; + partition_month integer; + partition_day integer; BEGIN SELECT max(attribute_kv.long_v) @@ -65,53 +66,138 @@ BEGIN if max_ttl IS NOT NULL AND max_ttl > 0 THEN date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl); partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date); + RAISE NOTICE 'Date by max ttl: %', date; RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date; IF partition_by_max_ttl_date IS NOT NULL THEN CASE WHEN partition_type = 'DAYS' THEN - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); - partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5); + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); + partition_by_max_ttl_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5); WHEN partition_type = 'MONTHS' THEN - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); ELSE - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); END CASE; - FOR partition IN SELECT tablename - FROM pg_tables - WHERE schemaname = 'public' - AND tablename like 'ts_kv_' || '%' - AND tablename != 'ts_kv_latest' - AND tablename != 'ts_kv_dictionary' - AND tablename != 'ts_kv_indefinite' - LOOP - IF partition != partition_by_max_ttl_date THEN - IF partition_year IS NOT NULL THEN - IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN - partition_to_delete := partition; - ELSE - IF partition_month IS NOT NULL THEN - IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN - partition_to_delete := partition; + IF partition_by_max_ttl_year IS NULL THEN + RAISE NOTICE 'Failed to remove partitions by max ttl date due to partition_by_max_ttl_year is null!'; + ELSE + IF partition_type = 'YEARS' THEN + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' + AND tablename != partition_by_max_ttl_date + LOOP + partition_year := SPLIT_PART(partition, '_', 3)::integer; + IF partition_year < partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + END IF; + END LOOP; + ELSE + IF partition_type = 'MONTHS' THEN + IF partition_by_max_ttl_month IS NULL THEN + RAISE NOTICE 'Failed to remove months partitions by max ttl date due to partition_by_max_ttl_month is null!'; + ELSE + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' + AND tablename != partition_by_max_ttl_date + LOOP + partition_year := SPLIT_PART(partition, '_', 3)::integer; + IF partition_year > partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; ELSE - IF partition_day IS NOT NULL THEN - IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN - partition_to_delete := partition; + IF partition_year < partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + ELSE + partition_month := SPLIT_PART(partition, '_', 4)::integer; + IF partition_year = partition_by_max_ttl_year::integer THEN + IF partition_month >= partition_by_max_ttl_month::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + END IF; END IF; END IF; END IF; + END LOOP; + END IF; + ELSE + IF partition_type = 'DAYS' THEN + IF partition_by_max_ttl_month IS NULL THEN + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_month is null!'; + ELSE + IF partition_by_max_ttl_day IS NULL THEN + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_day is null!'; + ELSE + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' + AND tablename != partition_by_max_ttl_date + LOOP + partition_year := SPLIT_PART(partition, '_', 3)::integer; + IF partition_year > partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + IF partition_year < partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + ELSE + partition_month := SPLIT_PART(partition, '_', 4)::integer; + IF partition_month > partition_by_max_ttl_month::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + IF partition_month < partition_by_max_ttl_month::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + ELSE + partition_day := SPLIT_PART(partition, '_', 5)::integer; + IF partition_day >= partition_by_max_ttl_day::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + IF partition_day < partition_by_max_ttl_day::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + END IF; + END IF; + END IF; + END IF; + END IF; + END IF; + END LOOP; END IF; END IF; END IF; - IF partition_to_delete IS NOT NULL THEN - RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete; - EXECUTE format('DROP TABLE IF EXISTS %I', partition_to_delete); - partition_to_delete := NULL; - deleted := deleted + 1; - END IF; END IF; - END LOOP; + END IF; + END IF; END IF; END IF; END @@ -127,8 +213,6 @@ BEGIN partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM'); WHEN partition_type = 'YEARS' THEN partition := 'ts_kv_' || to_char(date, 'yyyy'); - WHEN partition_type = 'INDEFINITE' THEN - partition := NULL; ELSE partition := NULL; END CASE;