refactoring ttl cleanup services
This commit is contained in:
parent
083dc38cf8
commit
6af34cf043
@ -18,17 +18,18 @@ CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar
|
|||||||
LANGUAGE plpgsql AS
|
LANGUAGE plpgsql AS
|
||||||
$$
|
$$
|
||||||
DECLARE
|
DECLARE
|
||||||
max_tenant_ttl bigint;
|
max_tenant_ttl bigint;
|
||||||
max_customer_ttl bigint;
|
max_customer_ttl bigint;
|
||||||
max_ttl bigint;
|
max_ttl bigint;
|
||||||
date timestamp;
|
date timestamp;
|
||||||
partition_by_max_ttl_date varchar;
|
partition_by_max_ttl_date varchar;
|
||||||
partition_month varchar;
|
partition_by_max_ttl_month varchar;
|
||||||
partition_day varchar;
|
partition_by_max_ttl_day varchar;
|
||||||
partition_year varchar;
|
partition_by_max_ttl_year varchar;
|
||||||
partition varchar;
|
partition varchar;
|
||||||
partition_to_delete varchar;
|
partition_year integer;
|
||||||
|
partition_month integer;
|
||||||
|
partition_day integer;
|
||||||
|
|
||||||
BEGIN
|
BEGIN
|
||||||
SELECT max(attribute_kv.long_v)
|
SELECT max(attribute_kv.long_v)
|
||||||
@ -45,53 +46,138 @@ BEGIN
|
|||||||
if max_ttl IS NOT NULL AND max_ttl > 0 THEN
|
if max_ttl IS NOT NULL AND max_ttl > 0 THEN
|
||||||
date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl);
|
date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl);
|
||||||
partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date);
|
partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date);
|
||||||
|
RAISE NOTICE 'Date by max ttl: %', date;
|
||||||
RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date;
|
RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date;
|
||||||
IF partition_by_max_ttl_date IS NOT NULL THEN
|
IF partition_by_max_ttl_date IS NOT NULL THEN
|
||||||
CASE
|
CASE
|
||||||
WHEN partition_type = 'DAYS' THEN
|
WHEN partition_type = 'DAYS' THEN
|
||||||
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
||||||
partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
|
partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
|
||||||
partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
|
partition_by_max_ttl_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
|
||||||
WHEN partition_type = 'MONTHS' THEN
|
WHEN partition_type = 'MONTHS' THEN
|
||||||
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
||||||
partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
|
partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
|
||||||
ELSE
|
ELSE
|
||||||
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
||||||
END CASE;
|
END CASE;
|
||||||
FOR partition IN SELECT tablename
|
IF partition_by_max_ttl_year IS NULL THEN
|
||||||
FROM pg_tables
|
RAISE NOTICE 'Failed to remove partitions by max ttl date due to partition_by_max_ttl_year is null!';
|
||||||
WHERE schemaname = 'public'
|
ELSE
|
||||||
AND tablename like 'ts_kv_' || '%'
|
IF partition_type = 'YEARS' THEN
|
||||||
AND tablename != 'ts_kv_latest'
|
FOR partition IN SELECT tablename
|
||||||
AND tablename != 'ts_kv_dictionary'
|
FROM pg_tables
|
||||||
AND tablename != 'ts_kv_indefinite'
|
WHERE schemaname = 'public'
|
||||||
LOOP
|
AND tablename like 'ts_kv_' || '%'
|
||||||
IF partition != partition_by_max_ttl_date THEN
|
AND tablename != 'ts_kv_latest'
|
||||||
IF partition_year IS NOT NULL THEN
|
AND tablename != 'ts_kv_dictionary'
|
||||||
IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN
|
AND tablename != 'ts_kv_indefinite'
|
||||||
partition_to_delete := partition;
|
AND tablename != partition_by_max_ttl_date
|
||||||
ELSE
|
LOOP
|
||||||
IF partition_month IS NOT NULL THEN
|
partition_year := SPLIT_PART(partition, '_', 3)::integer;
|
||||||
IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN
|
IF partition_year < partition_by_max_ttl_year::integer THEN
|
||||||
partition_to_delete := partition;
|
RAISE NOTICE 'Partition to delete by max ttl: %', partition;
|
||||||
|
EXECUTE format('DROP TABLE IF EXISTS %I', partition);
|
||||||
|
deleted := deleted + 1;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
ELSE
|
||||||
|
IF partition_type = 'MONTHS' THEN
|
||||||
|
IF partition_by_max_ttl_month IS NULL THEN
|
||||||
|
RAISE NOTICE 'Failed to remove months partitions by max ttl date due to partition_by_max_ttl_month is null!';
|
||||||
|
ELSE
|
||||||
|
FOR partition IN SELECT tablename
|
||||||
|
FROM pg_tables
|
||||||
|
WHERE schemaname = 'public'
|
||||||
|
AND tablename like 'ts_kv_' || '%'
|
||||||
|
AND tablename != 'ts_kv_latest'
|
||||||
|
AND tablename != 'ts_kv_dictionary'
|
||||||
|
AND tablename != 'ts_kv_indefinite'
|
||||||
|
AND tablename != partition_by_max_ttl_date
|
||||||
|
LOOP
|
||||||
|
partition_year := SPLIT_PART(partition, '_', 3)::integer;
|
||||||
|
IF partition_year > partition_by_max_ttl_year::integer THEN
|
||||||
|
RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
|
||||||
|
CONTINUE;
|
||||||
ELSE
|
ELSE
|
||||||
IF partition_day IS NOT NULL THEN
|
IF partition_year < partition_by_max_ttl_year::integer THEN
|
||||||
IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN
|
RAISE NOTICE 'Partition to delete by max ttl: %', partition;
|
||||||
partition_to_delete := partition;
|
EXECUTE format('DROP TABLE IF EXISTS %I', partition);
|
||||||
|
deleted := deleted + 1;
|
||||||
|
ELSE
|
||||||
|
partition_month := SPLIT_PART(partition, '_', 4)::integer;
|
||||||
|
IF partition_year = partition_by_max_ttl_year::integer THEN
|
||||||
|
IF partition_month >= partition_by_max_ttl_month::integer THEN
|
||||||
|
RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
|
||||||
|
CONTINUE;
|
||||||
|
ELSE
|
||||||
|
RAISE NOTICE 'Partition to delete by max ttl: %', partition;
|
||||||
|
EXECUTE format('DROP TABLE IF EXISTS %I', partition);
|
||||||
|
deleted := deleted + 1;
|
||||||
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
END IF;
|
||||||
|
ELSE
|
||||||
|
IF partition_type = 'DAYS' THEN
|
||||||
|
IF partition_by_max_ttl_month IS NULL THEN
|
||||||
|
RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_month is null!';
|
||||||
|
ELSE
|
||||||
|
IF partition_by_max_ttl_day IS NULL THEN
|
||||||
|
RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_day is null!';
|
||||||
|
ELSE
|
||||||
|
FOR partition IN SELECT tablename
|
||||||
|
FROM pg_tables
|
||||||
|
WHERE schemaname = 'public'
|
||||||
|
AND tablename like 'ts_kv_' || '%'
|
||||||
|
AND tablename != 'ts_kv_latest'
|
||||||
|
AND tablename != 'ts_kv_dictionary'
|
||||||
|
AND tablename != 'ts_kv_indefinite'
|
||||||
|
AND tablename != partition_by_max_ttl_date
|
||||||
|
LOOP
|
||||||
|
partition_year := SPLIT_PART(partition, '_', 3)::integer;
|
||||||
|
IF partition_year > partition_by_max_ttl_year::integer THEN
|
||||||
|
RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
|
||||||
|
CONTINUE;
|
||||||
|
ELSE
|
||||||
|
IF partition_year < partition_by_max_ttl_year::integer THEN
|
||||||
|
RAISE NOTICE 'Partition to delete by max ttl: %', partition;
|
||||||
|
EXECUTE format('DROP TABLE IF EXISTS %I', partition);
|
||||||
|
deleted := deleted + 1;
|
||||||
|
ELSE
|
||||||
|
partition_month := SPLIT_PART(partition, '_', 4)::integer;
|
||||||
|
IF partition_month > partition_by_max_ttl_month::integer THEN
|
||||||
|
RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
|
||||||
|
CONTINUE;
|
||||||
|
ELSE
|
||||||
|
IF partition_month < partition_by_max_ttl_month::integer THEN
|
||||||
|
RAISE NOTICE 'Partition to delete by max ttl: %', partition;
|
||||||
|
EXECUTE format('DROP TABLE IF EXISTS %I', partition);
|
||||||
|
deleted := deleted + 1;
|
||||||
|
ELSE
|
||||||
|
partition_day := SPLIT_PART(partition, '_', 5)::integer;
|
||||||
|
IF partition_day >= partition_by_max_ttl_day::integer THEN
|
||||||
|
RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
|
||||||
|
CONTINUE;
|
||||||
|
ELSE
|
||||||
|
IF partition_day < partition_by_max_ttl_day::integer THEN
|
||||||
|
RAISE NOTICE 'Partition to delete by max ttl: %', partition;
|
||||||
|
EXECUTE format('DROP TABLE IF EXISTS %I', partition);
|
||||||
|
deleted := deleted + 1;
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
END IF;
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
IF partition_to_delete IS NOT NULL THEN
|
|
||||||
RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete;
|
|
||||||
EXECUTE format('DROP TABLE IF EXISTS %I', partition_to_delete);
|
|
||||||
partition_to_delete := NULL;
|
|
||||||
deleted := deleted + 1;
|
|
||||||
END IF;
|
|
||||||
END IF;
|
END IF;
|
||||||
END LOOP;
|
END IF;
|
||||||
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
END
|
END
|
||||||
@ -107,8 +193,6 @@ BEGIN
|
|||||||
partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM');
|
partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM');
|
||||||
WHEN partition_type = 'YEARS' THEN
|
WHEN partition_type = 'YEARS' THEN
|
||||||
partition := 'ts_kv_' || to_char(date, 'yyyy');
|
partition := 'ts_kv_' || to_char(date, 'yyyy');
|
||||||
WHEN partition_type = 'INDEFINITE' THEN
|
|
||||||
partition := NULL;
|
|
||||||
ELSE
|
ELSE
|
||||||
partition := NULL;
|
partition := NULL;
|
||||||
END CASE;
|
END CASE;
|
||||||
|
|||||||
@ -45,9 +45,7 @@ public class EdgeEventsCleanUpService extends AbstractCleanUpService {
|
|||||||
@Scheduled(initialDelayString = "${sql.ttl.edge_events.execution_interval_ms}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
|
@Scheduled(initialDelayString = "${sql.ttl.edge_events.execution_interval_ms}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
|
||||||
public void cleanUp() {
|
public void cleanUp() {
|
||||||
if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
|
if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
|
||||||
log.info("Going to cleanup old edge events using ttl: {}s", ttl);
|
edgeService.cleanupEvents(ttl);
|
||||||
long totalEdgeEventsRemoved = edgeService.cleanupEvents(ttl);
|
|
||||||
log.info("Total edge events removed by TTL: [{}]", totalEdgeEventsRemoved);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -48,9 +48,7 @@ public class EventsCleanUpService extends AbstractCleanUpService {
|
|||||||
@Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}")
|
@Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}")
|
||||||
public void cleanUp() {
|
public void cleanUp() {
|
||||||
if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
|
if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
|
||||||
log.info("Going to cleanup old events using debug events ttl: {}s and other events ttl: {}s", debugTtl, ttl);
|
eventService.cleanupEvents(ttl, debugTtl);
|
||||||
long totalEventsRemoved = eventService.cleanupEvents(ttl, debugTtl);
|
|
||||||
log.info("Total events removed by TTL: [{}]", totalEventsRemoved);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -18,11 +18,15 @@ package org.thingsboard.server.service.ttl;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
|
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||||
import org.thingsboard.server.service.ttl.AbstractCleanUpService;
|
import org.thingsboard.server.service.ttl.AbstractCleanUpService;
|
||||||
|
|
||||||
|
@TbCoreComponent
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
@Service
|
||||||
public class TimeseriesCleanUpService extends AbstractCleanUpService {
|
public class TimeseriesCleanUpService extends AbstractCleanUpService {
|
||||||
|
|
||||||
@Value("${sql.ttl.ts.ts_key_value_ttl}")
|
@Value("${sql.ttl.ts.ts_key_value_ttl}")
|
||||||
|
|||||||
@ -94,5 +94,5 @@ public interface EdgeService {
|
|||||||
|
|
||||||
String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId);
|
String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId);
|
||||||
|
|
||||||
long cleanupEvents(long ttl);
|
void cleanupEvents(long ttl);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -46,6 +46,6 @@ public interface EventService {
|
|||||||
|
|
||||||
void removeEvents(TenantId tenantId, EntityId entityId);
|
void removeEvents(TenantId tenantId, EntityId entityId);
|
||||||
|
|
||||||
long cleanupEvents(long ttl, long debugTtl);
|
void cleanupEvents(long ttl, long debugTtl);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -180,7 +180,6 @@ public interface EdgeDao extends Dao<Edge> {
|
|||||||
/**
|
/**
|
||||||
* Executes stored procedure to cleanup old edge events.
|
* Executes stored procedure to cleanup old edge events.
|
||||||
* @param ttl the ttl for edge events in seconds
|
* @param ttl the ttl for edge events in seconds
|
||||||
* @return the number of deleted edge events
|
|
||||||
*/
|
*/
|
||||||
long cleanupEvents(long ttl);
|
void cleanupEvents(long ttl);
|
||||||
}
|
}
|
||||||
@ -628,8 +628,8 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long cleanupEvents(long ttl) {
|
public void cleanupEvents(long ttl) {
|
||||||
return edgeDao.cleanupEvents(ttl);
|
edgeDao.cleanupEvents(ttl);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<RuleChain> findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) {
|
private List<RuleChain> findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) {
|
||||||
|
|||||||
@ -132,8 +132,8 @@ public class BaseEventService implements EventService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long cleanupEvents(long ttl, long debugTtl) {
|
public void cleanupEvents(long ttl, long debugTtl) {
|
||||||
return eventDao.cleanupEvents(ttl, debugTtl);
|
eventDao.cleanupEvents(ttl, debugTtl);
|
||||||
}
|
}
|
||||||
|
|
||||||
private DataValidator<Event> eventValidator =
|
private DataValidator<Event> eventValidator =
|
||||||
|
|||||||
@ -106,7 +106,6 @@ public interface EventDao extends Dao<Event> {
|
|||||||
* Executes stored procedure to cleanup old events. Uses separate ttl for debug and other events.
|
* Executes stored procedure to cleanup old events. Uses separate ttl for debug and other events.
|
||||||
* @param otherEventsTtl the ttl for events in seconds
|
* @param otherEventsTtl the ttl for events in seconds
|
||||||
* @param debugEventsTtl the ttl for debug events in seconds
|
* @param debugEventsTtl the ttl for debug events in seconds
|
||||||
* @return the number of deleted events
|
|
||||||
*/
|
*/
|
||||||
long cleanupEvents(long otherEventsTtl, long debugEventsTtl);
|
void cleanupEvents(long otherEventsTtl, long debugEventsTtl);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -40,9 +40,9 @@ import org.thingsboard.server.dao.model.sql.EdgeInfoEntity;
|
|||||||
import org.thingsboard.server.dao.relation.RelationDao;
|
import org.thingsboard.server.dao.relation.RelationDao;
|
||||||
import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
|
import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
|
||||||
|
|
||||||
import java.sql.CallableStatement;
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.ResultSet;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Types;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -198,17 +198,19 @@ public class JpaEdgeDao extends JpaAbstractSearchTextDao<EdgeEntity, Edge> imple
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long cleanupEvents(long ttl) {
|
public void cleanupEvents(long ttl) {
|
||||||
try {
|
try {
|
||||||
CallableStatement stmt = dataSource.getConnection().prepareCall("{call cleanup_edge_events_by_ttl(?,?,?)}");
|
log.info("Going to cleanup old edge events using ttl: {}s", ttl);
|
||||||
|
PreparedStatement stmt = dataSource.getConnection().prepareStatement("call cleanup_edge_events_by_ttl(?,?)");
|
||||||
stmt.setLong(1, ttl);
|
stmt.setLong(1, ttl);
|
||||||
stmt.registerOutParameter(3, Types.BIGINT);
|
stmt.setLong(2, 0);
|
||||||
stmt.executeUpdate();
|
stmt.execute();
|
||||||
printWarnings(stmt);
|
printWarnings(stmt);
|
||||||
return stmt.getLong(3);
|
ResultSet resultSet = stmt.getResultSet();
|
||||||
|
resultSet.next();
|
||||||
|
log.info("Total edge events removed by TTL: [{}]", resultSet.getLong(1));
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
log.error("SQLException occurred during TTL task execution ", e);
|
log.error("SQLException occurred during edge events TTL task execution ", e);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -39,12 +39,9 @@ import org.thingsboard.server.dao.event.EventDao;
|
|||||||
import org.thingsboard.server.dao.model.sql.EventEntity;
|
import org.thingsboard.server.dao.model.sql.EventEntity;
|
||||||
import org.thingsboard.server.dao.sql.JpaAbstractDao;
|
import org.thingsboard.server.dao.sql.JpaAbstractDao;
|
||||||
|
|
||||||
import javax.sql.DataSource;
|
import java.sql.PreparedStatement;
|
||||||
import java.sql.CallableStatement;
|
import java.sql.ResultSet;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.SQLWarning;
|
|
||||||
import java.sql.Statement;
|
|
||||||
import java.sql.Types;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@ -262,18 +259,20 @@ public class JpaBaseEventDao extends JpaAbstractDao<EventEntity, Event> implemen
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long cleanupEvents(long otherEventsTtl, long debugEventsTtl) {
|
public void cleanupEvents(long otherEventsTtl, long debugEventsTtl) {
|
||||||
try {
|
try {
|
||||||
CallableStatement stmt = dataSource.getConnection().prepareCall("{call cleanup_events_by_ttl(?,?,?)}");
|
log.info("Going to cleanup old events using debug events ttl: {}s and other events ttl: {}s", debugEventsTtl, otherEventsTtl);
|
||||||
|
PreparedStatement stmt = dataSource.getConnection().prepareStatement("call cleanup_events_by_ttl(?,?,?)");
|
||||||
stmt.setLong(1, otherEventsTtl);
|
stmt.setLong(1, otherEventsTtl);
|
||||||
stmt.setLong(2, debugEventsTtl);
|
stmt.setLong(2, debugEventsTtl);
|
||||||
stmt.registerOutParameter(3, Types.BIGINT);
|
stmt.setLong(3, 0);
|
||||||
stmt.executeUpdate();
|
stmt.execute();
|
||||||
printWarnings(stmt);
|
printWarnings(stmt);
|
||||||
return stmt.getLong(3);
|
ResultSet resultSet = stmt.getResultSet();
|
||||||
|
resultSet.next();
|
||||||
|
log.info("Total events removed by TTL: [{}]", resultSet.getLong(1));
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
log.error("SQLException occurred during TTL task execution ", e);
|
log.error("SQLException occurred during events TTL task execution ", e);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -29,9 +29,9 @@ import org.thingsboard.server.dao.model.ModelConstants;
|
|||||||
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
|
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.sql.CallableStatement;
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.ResultSet;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Types;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@ -69,15 +69,17 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries
|
|||||||
public void cleanup(long systemTtl) {
|
public void cleanup(long systemTtl) {
|
||||||
try {
|
try {
|
||||||
log.info("Going to cleanup old timeseries data using ttl: {}s", systemTtl);
|
log.info("Going to cleanup old timeseries data using ttl: {}s", systemTtl);
|
||||||
CallableStatement stmt = dataSource.getConnection().prepareCall("{call cleanup_timeseries_by_ttl(?,?,?)}");
|
PreparedStatement stmt = dataSource.getConnection().prepareStatement("call cleanup_timeseries_by_ttl(?,?,?)");
|
||||||
stmt.setObject(1, ModelConstants.NULL_UUID);
|
stmt.setObject(1, ModelConstants.NULL_UUID);
|
||||||
stmt.setLong(2, systemTtl);
|
stmt.setLong(2, systemTtl);
|
||||||
stmt.registerOutParameter(3, Types.BIGINT);
|
stmt.setLong(3, 0);
|
||||||
stmt.executeUpdate();
|
stmt.execute();
|
||||||
printWarnings(stmt);
|
printWarnings(stmt);
|
||||||
log.info("Total telemetry removed stats by TTL for entities: [{}]", stmt.getLong(3));
|
ResultSet resultSet = stmt.getResultSet();
|
||||||
|
resultSet.next();
|
||||||
|
log.info("Total telemetry removed stats by TTL for entities: [{}]", resultSet.getLong(1));
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
log.error("SQLException occurred during TTL task execution ", e);
|
log.error("SQLException occurred during timeseries TTL task execution ", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -27,7 +27,6 @@ import org.springframework.stereotype.Component;
|
|||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
import org.thingsboard.server.dao.model.ModelConstants;
|
|
||||||
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
|
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
|
||||||
import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao;
|
import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao;
|
||||||
import org.thingsboard.server.dao.sqlts.insert.psql.PsqlPartitioningRepository;
|
import org.thingsboard.server.dao.sqlts.insert.psql.PsqlPartitioningRepository;
|
||||||
@ -36,9 +35,9 @@ import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate;
|
|||||||
import org.thingsboard.server.dao.util.PsqlDao;
|
import org.thingsboard.server.dao.util.PsqlDao;
|
||||||
import org.thingsboard.server.dao.util.SqlTsDao;
|
import org.thingsboard.server.dao.util.SqlTsDao;
|
||||||
|
|
||||||
import java.sql.CallableStatement;
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.ResultSet;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Types;
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.time.ZoneOffset;
|
import java.time.ZoneOffset;
|
||||||
@ -107,13 +106,15 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
|
|||||||
private void cleanupPartitions(long systemTtl) {
|
private void cleanupPartitions(long systemTtl) {
|
||||||
try {
|
try {
|
||||||
log.info("Going to cleanup old timeseries data partitions using partition type: {} and ttl: {}s", partitioning, systemTtl);
|
log.info("Going to cleanup old timeseries data partitions using partition type: {} and ttl: {}s", partitioning, systemTtl);
|
||||||
CallableStatement stmt = dataSource.getConnection().prepareCall("{call drop_partitions_by_max_ttl(?,?,?)}");
|
PreparedStatement stmt = dataSource.getConnection().prepareStatement("call drop_partitions_by_max_ttl(?,?,?)");
|
||||||
stmt.setObject(1, partitioning);
|
stmt.setString(1, partitioning);
|
||||||
stmt.setLong(2, systemTtl);
|
stmt.setLong(2, systemTtl);
|
||||||
stmt.registerOutParameter(3, Types.BIGINT);
|
stmt.setLong(3, 0);
|
||||||
stmt.executeUpdate();
|
stmt.execute();
|
||||||
printWarnings(stmt);
|
printWarnings(stmt);
|
||||||
log.info("Total partitions removed by TTL: [{}]", stmt.getLong(3));
|
ResultSet resultSet = stmt.getResultSet();
|
||||||
|
resultSet.next();
|
||||||
|
log.info("Total partitions removed by TTL: [{}]", resultSet.getLong(1));
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
log.error("SQLException occurred during TTL task execution ", e);
|
log.error("SQLException occurred during TTL task execution ", e);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -38,17 +38,18 @@ CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar
|
|||||||
LANGUAGE plpgsql AS
|
LANGUAGE plpgsql AS
|
||||||
$$
|
$$
|
||||||
DECLARE
|
DECLARE
|
||||||
max_tenant_ttl bigint;
|
max_tenant_ttl bigint;
|
||||||
max_customer_ttl bigint;
|
max_customer_ttl bigint;
|
||||||
max_ttl bigint;
|
max_ttl bigint;
|
||||||
date timestamp;
|
date timestamp;
|
||||||
partition_by_max_ttl_date varchar;
|
partition_by_max_ttl_date varchar;
|
||||||
partition_month varchar;
|
partition_by_max_ttl_month varchar;
|
||||||
partition_day varchar;
|
partition_by_max_ttl_day varchar;
|
||||||
partition_year varchar;
|
partition_by_max_ttl_year varchar;
|
||||||
partition varchar;
|
partition varchar;
|
||||||
partition_to_delete varchar;
|
partition_year integer;
|
||||||
|
partition_month integer;
|
||||||
|
partition_day integer;
|
||||||
|
|
||||||
BEGIN
|
BEGIN
|
||||||
SELECT max(attribute_kv.long_v)
|
SELECT max(attribute_kv.long_v)
|
||||||
@ -65,53 +66,138 @@ BEGIN
|
|||||||
if max_ttl IS NOT NULL AND max_ttl > 0 THEN
|
if max_ttl IS NOT NULL AND max_ttl > 0 THEN
|
||||||
date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl);
|
date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl);
|
||||||
partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date);
|
partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date);
|
||||||
|
RAISE NOTICE 'Date by max ttl: %', date;
|
||||||
RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date;
|
RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date;
|
||||||
IF partition_by_max_ttl_date IS NOT NULL THEN
|
IF partition_by_max_ttl_date IS NOT NULL THEN
|
||||||
CASE
|
CASE
|
||||||
WHEN partition_type = 'DAYS' THEN
|
WHEN partition_type = 'DAYS' THEN
|
||||||
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
||||||
partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
|
partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
|
||||||
partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
|
partition_by_max_ttl_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
|
||||||
WHEN partition_type = 'MONTHS' THEN
|
WHEN partition_type = 'MONTHS' THEN
|
||||||
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
||||||
partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
|
partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
|
||||||
ELSE
|
ELSE
|
||||||
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
||||||
END CASE;
|
END CASE;
|
||||||
FOR partition IN SELECT tablename
|
IF partition_by_max_ttl_year IS NULL THEN
|
||||||
FROM pg_tables
|
RAISE NOTICE 'Failed to remove partitions by max ttl date due to partition_by_max_ttl_year is null!';
|
||||||
WHERE schemaname = 'public'
|
ELSE
|
||||||
AND tablename like 'ts_kv_' || '%'
|
IF partition_type = 'YEARS' THEN
|
||||||
AND tablename != 'ts_kv_latest'
|
FOR partition IN SELECT tablename
|
||||||
AND tablename != 'ts_kv_dictionary'
|
FROM pg_tables
|
||||||
AND tablename != 'ts_kv_indefinite'
|
WHERE schemaname = 'public'
|
||||||
LOOP
|
AND tablename like 'ts_kv_' || '%'
|
||||||
IF partition != partition_by_max_ttl_date THEN
|
AND tablename != 'ts_kv_latest'
|
||||||
IF partition_year IS NOT NULL THEN
|
AND tablename != 'ts_kv_dictionary'
|
||||||
IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN
|
AND tablename != 'ts_kv_indefinite'
|
||||||
partition_to_delete := partition;
|
AND tablename != partition_by_max_ttl_date
|
||||||
ELSE
|
LOOP
|
||||||
IF partition_month IS NOT NULL THEN
|
partition_year := SPLIT_PART(partition, '_', 3)::integer;
|
||||||
IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN
|
IF partition_year < partition_by_max_ttl_year::integer THEN
|
||||||
partition_to_delete := partition;
|
RAISE NOTICE 'Partition to delete by max ttl: %', partition;
|
||||||
|
EXECUTE format('DROP TABLE IF EXISTS %I', partition);
|
||||||
|
deleted := deleted + 1;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
ELSE
|
||||||
|
IF partition_type = 'MONTHS' THEN
|
||||||
|
IF partition_by_max_ttl_month IS NULL THEN
|
||||||
|
RAISE NOTICE 'Failed to remove months partitions by max ttl date due to partition_by_max_ttl_month is null!';
|
||||||
|
ELSE
|
||||||
|
FOR partition IN SELECT tablename
|
||||||
|
FROM pg_tables
|
||||||
|
WHERE schemaname = 'public'
|
||||||
|
AND tablename like 'ts_kv_' || '%'
|
||||||
|
AND tablename != 'ts_kv_latest'
|
||||||
|
AND tablename != 'ts_kv_dictionary'
|
||||||
|
AND tablename != 'ts_kv_indefinite'
|
||||||
|
AND tablename != partition_by_max_ttl_date
|
||||||
|
LOOP
|
||||||
|
partition_year := SPLIT_PART(partition, '_', 3)::integer;
|
||||||
|
IF partition_year > partition_by_max_ttl_year::integer THEN
|
||||||
|
RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
|
||||||
|
CONTINUE;
|
||||||
ELSE
|
ELSE
|
||||||
IF partition_day IS NOT NULL THEN
|
IF partition_year < partition_by_max_ttl_year::integer THEN
|
||||||
IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN
|
RAISE NOTICE 'Partition to delete by max ttl: %', partition;
|
||||||
partition_to_delete := partition;
|
EXECUTE format('DROP TABLE IF EXISTS %I', partition);
|
||||||
|
deleted := deleted + 1;
|
||||||
|
ELSE
|
||||||
|
partition_month := SPLIT_PART(partition, '_', 4)::integer;
|
||||||
|
IF partition_year = partition_by_max_ttl_year::integer THEN
|
||||||
|
IF partition_month >= partition_by_max_ttl_month::integer THEN
|
||||||
|
RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
|
||||||
|
CONTINUE;
|
||||||
|
ELSE
|
||||||
|
RAISE NOTICE 'Partition to delete by max ttl: %', partition;
|
||||||
|
EXECUTE format('DROP TABLE IF EXISTS %I', partition);
|
||||||
|
deleted := deleted + 1;
|
||||||
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
END IF;
|
||||||
|
ELSE
|
||||||
|
IF partition_type = 'DAYS' THEN
|
||||||
|
IF partition_by_max_ttl_month IS NULL THEN
|
||||||
|
RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_month is null!';
|
||||||
|
ELSE
|
||||||
|
IF partition_by_max_ttl_day IS NULL THEN
|
||||||
|
RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_day is null!';
|
||||||
|
ELSE
|
||||||
|
FOR partition IN SELECT tablename
|
||||||
|
FROM pg_tables
|
||||||
|
WHERE schemaname = 'public'
|
||||||
|
AND tablename like 'ts_kv_' || '%'
|
||||||
|
AND tablename != 'ts_kv_latest'
|
||||||
|
AND tablename != 'ts_kv_dictionary'
|
||||||
|
AND tablename != 'ts_kv_indefinite'
|
||||||
|
AND tablename != partition_by_max_ttl_date
|
||||||
|
LOOP
|
||||||
|
partition_year := SPLIT_PART(partition, '_', 3)::integer;
|
||||||
|
IF partition_year > partition_by_max_ttl_year::integer THEN
|
||||||
|
RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
|
||||||
|
CONTINUE;
|
||||||
|
ELSE
|
||||||
|
IF partition_year < partition_by_max_ttl_year::integer THEN
|
||||||
|
RAISE NOTICE 'Partition to delete by max ttl: %', partition;
|
||||||
|
EXECUTE format('DROP TABLE IF EXISTS %I', partition);
|
||||||
|
deleted := deleted + 1;
|
||||||
|
ELSE
|
||||||
|
partition_month := SPLIT_PART(partition, '_', 4)::integer;
|
||||||
|
IF partition_month > partition_by_max_ttl_month::integer THEN
|
||||||
|
RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
|
||||||
|
CONTINUE;
|
||||||
|
ELSE
|
||||||
|
IF partition_month < partition_by_max_ttl_month::integer THEN
|
||||||
|
RAISE NOTICE 'Partition to delete by max ttl: %', partition;
|
||||||
|
EXECUTE format('DROP TABLE IF EXISTS %I', partition);
|
||||||
|
deleted := deleted + 1;
|
||||||
|
ELSE
|
||||||
|
partition_day := SPLIT_PART(partition, '_', 5)::integer;
|
||||||
|
IF partition_day >= partition_by_max_ttl_day::integer THEN
|
||||||
|
RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
|
||||||
|
CONTINUE;
|
||||||
|
ELSE
|
||||||
|
IF partition_day < partition_by_max_ttl_day::integer THEN
|
||||||
|
RAISE NOTICE 'Partition to delete by max ttl: %', partition;
|
||||||
|
EXECUTE format('DROP TABLE IF EXISTS %I', partition);
|
||||||
|
deleted := deleted + 1;
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
END IF;
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
IF partition_to_delete IS NOT NULL THEN
|
|
||||||
RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete;
|
|
||||||
EXECUTE format('DROP TABLE IF EXISTS %I', partition_to_delete);
|
|
||||||
partition_to_delete := NULL;
|
|
||||||
deleted := deleted + 1;
|
|
||||||
END IF;
|
|
||||||
END IF;
|
END IF;
|
||||||
END LOOP;
|
END IF;
|
||||||
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
END IF;
|
END IF;
|
||||||
END
|
END
|
||||||
@ -127,8 +213,6 @@ BEGIN
|
|||||||
partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM');
|
partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM');
|
||||||
WHEN partition_type = 'YEARS' THEN
|
WHEN partition_type = 'YEARS' THEN
|
||||||
partition := 'ts_kv_' || to_char(date, 'yyyy');
|
partition := 'ts_kv_' || to_char(date, 'yyyy');
|
||||||
WHEN partition_type = 'INDEFINITE' THEN
|
|
||||||
partition := NULL;
|
|
||||||
ELSE
|
ELSE
|
||||||
partition := NULL;
|
partition := NULL;
|
||||||
END CASE;
|
END CASE;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user