From 11479935c2d511288c04aaca87a70e8311388ab4 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Tue, 7 Apr 2020 11:55:26 +0300 Subject: [PATCH 1/2] SQL TTL Implemenation * init commit * improvements for ttl-functions * added drop partitions by ttl function * add load ttl function to upgrade script * fix typo * add IF NOT EXISTS for json_v in timescale upgrade * changed logic for removing customer records from ts_kv * improvements in upgrade scenario * improvements * added intial delay for TTL task execution --- .../schema_update_psql_drop_partitions.sql | 123 ++++++++ .../data/upgrade/2.4.3/schema_update_ttl.sql | 125 ++++++++ .../AbstractSqlTsDatabaseUpgradeService.java | 24 +- .../install/PsqlTsDatabaseUpgradeService.java | 60 ++-- .../TimescaleTsDatabaseUpgradeService.java | 54 ++-- .../ttl/AbstractTimeseriesCleanUpService.java | 89 ++++++ .../ttl/PsqlTimeseriesCleanUpService.java | 41 +++ .../TimescaleTimeseriesCleanUpService.java | 35 +++ .../src/main/resources/thingsboard.yml | 4 + .../server/dao/util/PsqlTsDao.java | 21 ++ .../main/resources/sql/schema-timescale.sql | 120 +++++++- dao/src/main/resources/sql/schema-ts-psql.sql | 271 ++++++++++++++++-- 12 files changed, 896 insertions(+), 71 deletions(-) create mode 100644 application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql create mode 100644 application/src/main/data/upgrade/2.4.3/schema_update_ttl.sql create mode 100644 application/src/main/java/org/thingsboard/server/service/ttl/AbstractTimeseriesCleanUpService.java create mode 100644 application/src/main/java/org/thingsboard/server/service/ttl/PsqlTimeseriesCleanUpService.java create mode 100644 application/src/main/java/org/thingsboard/server/service/ttl/TimescaleTimeseriesCleanUpService.java create mode 100644 common/dao-api/src/main/java/org/thingsboard/server/dao/util/PsqlTsDao.java diff --git a/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql b/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql new file mode 100644 index 0000000000..0916c241a1 --- /dev/null +++ b/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql @@ -0,0 +1,123 @@ +-- +-- Copyright © 2016-2020 The Thingsboard Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar, IN system_ttl bigint, INOUT deleted bigint) + LANGUAGE plpgsql AS +$$ +DECLARE + max_tenant_ttl bigint; + max_customer_ttl bigint; + max_ttl bigint; + date timestamp; + partition_by_max_ttl_date varchar; + partition_month varchar; + partition_day varchar; + partition_year varchar; + partition varchar; + partition_to_delete varchar; + + +BEGIN + SELECT max(attribute_kv.long_v) + FROM tenant + INNER JOIN attribute_kv ON tenant.id = attribute_kv.entity_id + WHERE attribute_kv.attribute_key = 'TTL' + into max_tenant_ttl; + SELECT max(attribute_kv.long_v) + FROM customer + INNER JOIN attribute_kv ON customer.id = attribute_kv.entity_id + WHERE attribute_kv.attribute_key = 'TTL' + into max_customer_ttl; + max_ttl := GREATEST(system_ttl, max_customer_ttl, max_tenant_ttl); + if max_ttl IS NOT NULL AND max_ttl > 0 THEN + date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - (max_ttl / 1000)); + partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date); + RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date; + IF partition_by_max_ttl_date IS NOT NULL THEN + CASE + WHEN partition_type = 'DAYS' THEN + partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); + partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5); + WHEN partition_type = 'MONTHS' THEN + partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); + ELSE + partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + END CASE; + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + LOOP + IF partition != partition_by_max_ttl_date THEN + IF partition_year IS NOT NULL THEN + IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN + partition_to_delete := partition; + ELSE + IF partition_month IS NOT NULL THEN + IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN + partition_to_delete := partition; + ELSE + IF partition_day IS NOT NULL THEN + IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN + partition_to_delete := partition; + END IF; + END IF; + END IF; + END IF; + END IF; + END IF; + END IF; + IF partition_to_delete IS NOT NULL THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete; + EXECUTE format('DROP TABLE %I', partition_to_delete); + deleted := deleted + 1; + END IF; + END LOOP; + END IF; + END IF; +END +$$; + +CREATE OR REPLACE FUNCTION get_partition_by_max_ttl_date(IN partition_type varchar, IN date timestamp, OUT partition varchar) AS +$$ +BEGIN + CASE + WHEN partition_type = 'DAYS' THEN + partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM') || '_' || to_char(date, 'dd'); + WHEN partition_type = 'MONTHS' THEN + partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM'); + WHEN partition_type = 'YEARS' THEN + partition := 'ts_kv_' || to_char(date, 'yyyy'); + WHEN partition_type = 'INDEFINITE' THEN + partition := NULL; + ELSE + partition := NULL; + END CASE; + IF partition IS NOT NULL THEN + IF NOT EXISTS(SELECT + FROM pg_tables + WHERE schemaname = 'public' + AND tablename = partition) THEN + partition := NULL; + RAISE NOTICE 'Failed to found partition by ttl'; + END IF; + END IF; +END; +$$ LANGUAGE plpgsql; diff --git a/application/src/main/data/upgrade/2.4.3/schema_update_ttl.sql b/application/src/main/data/upgrade/2.4.3/schema_update_ttl.sql new file mode 100644 index 0000000000..ff1fb5129b --- /dev/null +++ b/application/src/main/data/upgrade/2.4.3/schema_update_ttl.sql @@ -0,0 +1,125 @@ +-- +-- Copyright © 2016-2020 The Thingsboard Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +CREATE OR REPLACE FUNCTION to_uuid(IN entity_id varchar, OUT uuid_id uuid) AS +$$ +BEGIN + uuid_id := substring(entity_id, 8, 8) || '-' || substring(entity_id, 4, 4) || '-1' || substring(entity_id, 1, 3) || + '-' || substring(entity_id, 16, 4) || '-' || substring(entity_id, 20, 12); +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION delete_device_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint, + OUT deleted bigint) AS +$$ +BEGIN + EXECUTE format( + 'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(device.id) as entity_id FROM device WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted', + tenant_id, customer_id, ttl) into deleted; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION delete_asset_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint, + OUT deleted bigint) AS +$$ +BEGIN + EXECUTE format( + 'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(asset.id) as entity_id FROM asset WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted', + tenant_id, customer_id, ttl) into deleted; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION delete_customer_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint, + OUT deleted bigint) AS +$$ +BEGIN + EXECUTE format( + 'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(customer.id) as entity_id FROM customer WHERE tenant_id = %L and id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted', + tenant_id, customer_id, ttl) into deleted; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE cleanup_timeseries_by_ttl(IN null_uuid varchar(31), + IN system_ttl bigint, INOUT deleted bigint) + LANGUAGE plpgsql AS +$$ +DECLARE + tenant_cursor CURSOR FOR select tenant.id as tenant_id + from tenant; + tenant_id_record varchar; + customer_id_record varchar; + tenant_ttl bigint; + customer_ttl bigint; + deleted_for_entities bigint; + tenant_ttl_ts bigint; + customer_ttl_ts bigint; +BEGIN + OPEN tenant_cursor; + FETCH tenant_cursor INTO tenant_id_record; + WHILE FOUND + LOOP + EXECUTE format( + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L', + tenant_id_record, 'TTL') INTO tenant_ttl; + if tenant_ttl IS NULL THEN + tenant_ttl := system_ttl; + END IF; + IF tenant_ttl > 0 THEN + tenant_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - tenant_ttl::bigint * 1000)::bigint; + deleted_for_entities := delete_device_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for devices where tenant_id = %', deleted_for_entities, tenant_id_record; + deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for assets where tenant_id = %', deleted_for_entities, tenant_id_record; + END IF; + FOR customer_id_record IN + SELECT customer.id AS customer_id FROM customer WHERE customer.tenant_id = tenant_id_record + LOOP + EXECUTE format( + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L', + customer_id_record, 'TTL') INTO customer_ttl; + IF customer_ttl IS NULL THEN + customer_ttl_ts := tenant_ttl_ts; + ELSE + IF customer_ttl > 0 THEN + customer_ttl_ts := + (EXTRACT(EPOCH FROM current_timestamp) * 1000 - + customer_ttl::bigint * 1000)::bigint; + END IF; + END IF; + IF customer_ttl_ts IS NOT NULL AND customer_ttl_ts > 0 THEN + deleted_for_entities := + delete_customer_records_from_ts_kv(tenant_id_record, customer_id_record, + customer_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for customer with id = % where tenant_id = %', deleted_for_entities, customer_id_record, tenant_id_record; + deleted_for_entities := + delete_device_records_from_ts_kv(tenant_id_record, customer_id_record, + customer_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for devices where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record; + deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, + customer_id_record, + customer_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for assets where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record; + END IF; + END LOOP; + FETCH tenant_cursor INTO tenant_id_record; + END LOOP; +END +$$; diff --git a/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java index 70e56f9489..ab03ca23ab 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java @@ -47,7 +47,7 @@ public abstract class AbstractSqlTsDatabaseUpgradeService { @Autowired protected InstallScripts installScripts; - protected abstract void loadSql(Connection conn); + protected abstract void loadSql(Connection conn, String fileName); protected void loadFunctions(Path sqlFile, Connection conn) throws Exception { String sql = new String(Files.readAllBytes(sqlFile), StandardCharsets.UTF_8); @@ -70,6 +70,26 @@ public abstract class AbstractSqlTsDatabaseUpgradeService { return versionValid; } + protected boolean isOldSchema(Connection conn, long fromVersion) { + boolean isOldSchema = true; + try { + Statement statement = conn.createStatement(); + statement.execute("CREATE TABLE IF NOT EXISTS tb_schema_settings ( schema_version bigint NOT NULL, CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version));"); + Thread.sleep(1000); + ResultSet resultSet = statement.executeQuery("SELECT schema_version FROM tb_schema_settings;"); + if (resultSet.next()) { + isOldSchema = resultSet.getLong(1) <= fromVersion; + } else { + resultSet.close(); + statement.execute("INSERT INTO tb_schema_settings (schema_version) VALUES (" + fromVersion + ")"); + } + statement.close(); + } catch (InterruptedException | SQLException e) { + log.info("Failed to check current PostgreSQL schema due to: {}", e.getMessage()); + } + return isOldSchema; + } + protected void executeQuery(Connection conn, String query) { try { Statement statement = conn.createStatement(); @@ -83,7 +103,7 @@ public abstract class AbstractSqlTsDatabaseUpgradeService { nextWarning = nextWarning.getNextWarning(); } } - Thread.sleep(5000); + Thread.sleep(2000); log.info("Successfully executed query: {}", query); } catch (InterruptedException | SQLException e) { log.info("Failed to execute query: {} due to: {}", query, e.getMessage()); diff --git a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java index a630540981..5f97a6eaa5 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java @@ -34,6 +34,8 @@ import java.sql.DriverManager; public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeService implements DatabaseTsUpgradeService { private static final String LOAD_FUNCTIONS_SQL = "schema_update_psql_ts.sql"; + private static final String LOAD_TTL_FUNCTIONS_SQL = "schema_update_ttl.sql"; + private static final String LOAD_DROP_PARTITIONS_FUNCTIONS_SQL = "schema_update_psql_drop_partitions.sql"; private static final String TS_KV_OLD = "ts_kv_old;"; private static final String TS_KV_LATEST_OLD = "ts_kv_latest_old;"; @@ -76,30 +78,39 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe throw new RuntimeException("PostgreSQL version should be at least more than 11, please upgrade your PostgreSQL and restart the script!"); } else { log.info("PostgreSQL version is valid!"); - log.info("Load upgrade functions ..."); - loadSql(conn); - log.info("Updating timeseries schema ..."); - executeQuery(conn, CALL_CREATE_PARTITION_TS_KV_TABLE); - executeQuery(conn, CALL_CREATE_PARTITIONS); - executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE); - executeQuery(conn, CALL_INSERT_INTO_DICTIONARY); - executeQuery(conn, CALL_INSERT_INTO_TS_KV); - executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE); - executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST); + if (isOldSchema(conn, 2004003)) { + log.info("Load upgrade functions ..."); + loadSql(conn, LOAD_FUNCTIONS_SQL); + log.info("Updating timeseries schema ..."); + executeQuery(conn, CALL_CREATE_PARTITION_TS_KV_TABLE); + executeQuery(conn, CALL_CREATE_PARTITIONS); + executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE); + executeQuery(conn, CALL_INSERT_INTO_DICTIONARY); + executeQuery(conn, CALL_INSERT_INTO_TS_KV); + executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE); + executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST); - executeQuery(conn, DROP_TABLE_TS_KV_OLD); - executeQuery(conn, DROP_TABLE_TS_KV_LATEST_OLD); + executeQuery(conn, DROP_TABLE_TS_KV_OLD); + executeQuery(conn, DROP_TABLE_TS_KV_LATEST_OLD); - executeQuery(conn, DROP_PROCEDURE_CREATE_PARTITION_TS_KV_TABLE); - executeQuery(conn, DROP_PROCEDURE_CREATE_PARTITIONS); - executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE); - executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY); - executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV); - executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE); - executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); + executeQuery(conn, DROP_PROCEDURE_CREATE_PARTITION_TS_KV_TABLE); + executeQuery(conn, DROP_PROCEDURE_CREATE_PARTITIONS); + executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV); + executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); - executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN json_v json;"); - executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN json_v json;"); + executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;"); + executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN IF NOT EXISTS json_v json;"); + } + + log.info("Load TTL functions ..."); + loadSql(conn, LOAD_TTL_FUNCTIONS_SQL); + log.info("Load Drop Partitions functions ..."); + loadSql(conn, LOAD_DROP_PARTITIONS_FUNCTIONS_SQL); + + executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005000"); log.info("schema timeseries updated!"); } @@ -110,11 +121,12 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe } } - protected void loadSql(Connection conn) { - Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", LOAD_FUNCTIONS_SQL); + @Override + protected void loadSql(Connection conn, String fileName) { + Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName); try { loadFunctions(schemaUpdateFile, conn); - log.info("Upgrade functions successfully loaded!"); + log.info("Functions successfully loaded!"); } catch (Exception e) { log.info("Failed to load PostgreSQL upgrade functions due to: {}", e.getMessage()); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java index 99afd59a50..21bdb500d9 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java @@ -39,6 +39,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr private long chunkTimeInterval; private static final String LOAD_FUNCTIONS_SQL = "schema_update_timescale_ts.sql"; + private static final String LOAD_TTL_FUNCTIONS_SQL = "schema_update_ttl.sql"; private static final String TENANT_TS_KV_OLD_TABLE = "tenant_ts_kv_old;"; @@ -79,31 +80,37 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr throw new RuntimeException("PostgreSQL version should be at least more than 11, please upgrade your PostgreSQL and restart the script!"); } else { log.info("PostgreSQL version is valid!"); - log.info("Load upgrade functions ..."); - loadSql(conn); - log.info("Updating timescale schema ..."); - executeQuery(conn, CALL_CREATE_TS_KV_LATEST_TABLE); - executeQuery(conn, CALL_CREATE_NEW_TENANT_TS_KV_TABLE); + if (isOldSchema(conn, 2004003)) { + log.info("Load upgrade functions ..."); + loadSql(conn, LOAD_FUNCTIONS_SQL); + log.info("Updating timescale schema ..."); + executeQuery(conn, CALL_CREATE_TS_KV_LATEST_TABLE); + executeQuery(conn, CALL_CREATE_NEW_TENANT_TS_KV_TABLE); - executeQuery(conn, "SELECT create_hypertable('ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);"); + executeQuery(conn, "SELECT create_hypertable('ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);"); - executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE); - executeQuery(conn, CALL_INSERT_INTO_DICTIONARY); - executeQuery(conn, CALL_INSERT_INTO_TS_KV); - executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST); + executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE); + executeQuery(conn, CALL_INSERT_INTO_DICTIONARY); + executeQuery(conn, CALL_INSERT_INTO_TS_KV); + executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST); - executeQuery(conn, DROP_OLD_TENANT_TS_KV_TABLE); + executeQuery(conn, DROP_OLD_TENANT_TS_KV_TABLE); - executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_LATEST_TABLE); - executeQuery(conn, DROP_PROCEDURE_CREATE_TENANT_TS_KV_TABLE_COPY); - executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE); - executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY); - executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TENANT_TS_KV); - executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); + executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_LATEST_TABLE); + executeQuery(conn, DROP_PROCEDURE_CREATE_TENANT_TS_KV_TABLE_COPY); + executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TENANT_TS_KV); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); - executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN json_v json;"); - executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN json_v json;"); + executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;"); + executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN IF NOT EXISTS json_v json;"); + } + log.info("Load TTL functions ..."); + loadSql(conn, LOAD_TTL_FUNCTIONS_SQL); + + executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005000"); log.info("schema timescale updated!"); } } @@ -113,13 +120,14 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr } } - protected void loadSql(Connection conn) { - Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", LOAD_FUNCTIONS_SQL); + @Override + protected void loadSql(Connection conn, String fileName) { + Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName); try { loadFunctions(schemaUpdateFile, conn); - log.info("Upgrade functions successfully loaded!"); + log.info("Functions successfully loaded!"); } catch (Exception e) { - log.info("Failed to load Timescale upgrade functions due to: {}", e.getMessage()); + log.info("Failed to load PostgreSQL upgrade functions due to: {}", e.getMessage()); } } } \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractTimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractTimeseriesCleanUpService.java new file mode 100644 index 0000000000..2399450d86 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractTimeseriesCleanUpService.java @@ -0,0 +1,89 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ttl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.thingsboard.server.dao.util.PsqlTsAnyDao; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Statement; + +@PsqlTsAnyDao +@Slf4j +public abstract class AbstractTimeseriesCleanUpService { + + @Value("${sql.ttl.ts_key_value_ttl}") + protected long systemTtl; + + @Value("${sql.ttl.enabled}") + private boolean ttlTaskExecutionEnabled; + + @Value("${spring.datasource.url}") + private String dbUrl; + + @Value("${spring.datasource.username}") + private String dbUserName; + + @Value("${spring.datasource.password}") + private String dbPassword; + + @Scheduled(initialDelayString = "${sql.ttl.execution_interval_ms}", fixedDelayString = "${sql.ttl.execution_interval_ms}") + public void cleanUp() { + if (ttlTaskExecutionEnabled) { + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { + doCleanUp(conn); + } catch (SQLException e) { + log.error("SQLException occurred during TTL task execution ", e); + } + } + } + + protected abstract void doCleanUp(Connection connection); + + protected long executeQuery(Connection conn, String query) { + long removed = 0L; + try { + Statement statement = conn.createStatement(); + ResultSet resultSet = statement.executeQuery(query); + getWarnings(statement); + resultSet.next(); + removed = resultSet.getLong(1); + log.debug("Successfully executed query: {}", query); + } catch (SQLException e) { + log.debug("Failed to execute query: {} due to: {}", query, e.getMessage()); + } + return removed; + } + + private void getWarnings(Statement statement) throws SQLException { + SQLWarning warnings = statement.getWarnings(); + if (warnings != null) { + log.debug("{}", warnings.getMessage()); + SQLWarning nextWarning = warnings.getNextWarning(); + while (nextWarning != null) { + log.debug("{}", nextWarning.getMessage()); + nextWarning = nextWarning.getNextWarning(); + } + } + } + +} \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/PsqlTimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/PsqlTimeseriesCleanUpService.java new file mode 100644 index 0000000000..ab344dc518 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ttl/PsqlTimeseriesCleanUpService.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ttl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.thingsboard.server.dao.model.ModelConstants; +import org.thingsboard.server.dao.util.PsqlTsDao; + +import java.sql.Connection; + +@PsqlTsDao +@Service +@Slf4j +public class PsqlTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService { + + @Value("${sql.postgres.ts_key_value_partitioning}") + private String partitionType; + + @Override + protected void doCleanUp(Connection connection) { + long totalPartitionsRemoved = executeQuery(connection, "call drop_partitions_by_max_ttl('" + partitionType + "'," + systemTtl + ", 0);"); + log.info("Total partitions removed by TTL: [{}]", totalPartitionsRemoved); + long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID_STR + "'," + systemTtl + ", 0);"); + log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved); + } +} \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/TimescaleTimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/TimescaleTimeseriesCleanUpService.java new file mode 100644 index 0000000000..1dbdb4ad55 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ttl/TimescaleTimeseriesCleanUpService.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ttl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.thingsboard.server.dao.model.ModelConstants; +import org.thingsboard.server.dao.util.TimescaleDBTsDao; + +import java.sql.Connection; + +@TimescaleDBTsDao +@Service +@Slf4j +public class TimescaleTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService { + + @Override + protected void doCleanUp(Connection connection) { + long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID_STR + "'," + systemTtl + ", 0);"); + log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved); + } +} \ No newline at end of file diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 820fe14be1..0402c91387 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -216,6 +216,10 @@ sql: timescale: # Specify Interval size for new data chunks storage. chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}" + ttl: + enabled: "${SQL_TTL_ENABLED:true}" + execution_interval_ms: "${SQL_TTL_EXECUTION_INTERVAL:86400000}" # Number of miliseconds + ts_key_value_ttl: "${SQL_TTL_TS_KEY_VALUE_TTL:0}" # Number of seconds # Actor system parameters actors: diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/util/PsqlTsDao.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/util/PsqlTsDao.java new file mode 100644 index 0000000000..cc0d9051e5 --- /dev/null +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/util/PsqlTsDao.java @@ -0,0 +1,21 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; + +@ConditionalOnExpression("'${database.ts.type}'=='sql' && '${spring.jpa.database-platform}'=='org.hibernate.dialect.PostgreSQLDialect'") +public @interface PsqlTsDao { } \ No newline at end of file diff --git a/dao/src/main/resources/sql/schema-timescale.sql b/dao/src/main/resources/sql/schema-timescale.sql index b95c8b86ba..32e2a78620 100644 --- a/dao/src/main/resources/sql/schema-timescale.sql +++ b/dao/src/main/resources/sql/schema-timescale.sql @@ -44,4 +44,122 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest ( dbl_v double precision, json_v json, CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) -); \ No newline at end of file +); + +CREATE TABLE IF NOT EXISTS tb_schema_settings +( + schema_version bigint NOT NULL, + CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version) +); + +INSERT INTO tb_schema_settings (schema_version) VALUES (2005000); + +CREATE OR REPLACE FUNCTION to_uuid(IN entity_id varchar, OUT uuid_id uuid) AS +$$ +BEGIN + uuid_id := substring(entity_id, 8, 8) || '-' || substring(entity_id, 4, 4) || '-1' || substring(entity_id, 1, 3) || + '-' || substring(entity_id, 16, 4) || '-' || substring(entity_id, 20, 12); +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION delete_device_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint, + OUT deleted bigint) AS +$$ +BEGIN + EXECUTE format( + 'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(device.id) as entity_id FROM device WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted', + tenant_id, customer_id, ttl) into deleted; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION delete_asset_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint, + OUT deleted bigint) AS +$$ +BEGIN + EXECUTE format( + 'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(asset.id) as entity_id FROM asset WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted', + tenant_id, customer_id, ttl) into deleted; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION delete_customer_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint, + OUT deleted bigint) AS +$$ +BEGIN + EXECUTE format( + 'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(customer.id) as entity_id FROM customer WHERE tenant_id = %L and id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted', + tenant_id, customer_id, ttl) into deleted; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE cleanup_timeseries_by_ttl(IN null_uuid varchar(31), + IN system_ttl bigint, INOUT deleted bigint) + LANGUAGE plpgsql AS +$$ +DECLARE + tenant_cursor CURSOR FOR select tenant.id as tenant_id + from tenant; + tenant_id_record varchar; + customer_id_record varchar; + tenant_ttl bigint; + customer_ttl bigint; + deleted_for_entities bigint; + tenant_ttl_ts bigint; + customer_ttl_ts bigint; +BEGIN + OPEN tenant_cursor; + FETCH tenant_cursor INTO tenant_id_record; + WHILE FOUND + LOOP + EXECUTE format( + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L', + tenant_id_record, 'TTL') INTO tenant_ttl; + if tenant_ttl IS NULL THEN + tenant_ttl := system_ttl; + END IF; + IF tenant_ttl > 0 THEN + tenant_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - tenant_ttl::bigint * 1000)::bigint; + deleted_for_entities := delete_device_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for devices where tenant_id = %', deleted_for_entities, tenant_id_record; + deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for assets where tenant_id = %', deleted_for_entities, tenant_id_record; + END IF; + FOR customer_id_record IN + SELECT customer.id AS customer_id FROM customer WHERE customer.tenant_id = tenant_id_record + LOOP + EXECUTE format( + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L', + customer_id_record, 'TTL') INTO customer_ttl; + IF customer_ttl IS NULL THEN + customer_ttl_ts := tenant_ttl_ts; + ELSE + IF customer_ttl > 0 THEN + customer_ttl_ts := + (EXTRACT(EPOCH FROM current_timestamp) * 1000 - + customer_ttl::bigint * 1000)::bigint; + END IF; + END IF; + IF customer_ttl_ts IS NOT NULL AND customer_ttl_ts > 0 THEN + deleted_for_entities := + delete_customer_records_from_ts_kv(tenant_id_record, customer_id_record, + customer_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for customer with id = % where tenant_id = %', deleted_for_entities, customer_id_record, tenant_id_record; + deleted_for_entities := + delete_device_records_from_ts_kv(tenant_id_record, customer_id_record, + customer_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for devices where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record; + deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, + customer_id_record, + customer_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for assets where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record; + END IF; + END LOOP; + FETCH tenant_cursor INTO tenant_id_record; + END LOOP; +END +$$; diff --git a/dao/src/main/resources/sql/schema-ts-psql.sql b/dao/src/main/resources/sql/schema-ts-psql.sql index 32b6762c8e..3789444106 100644 --- a/dao/src/main/resources/sql/schema-ts-psql.sql +++ b/dao/src/main/resources/sql/schema-ts-psql.sql @@ -14,31 +14,260 @@ -- limitations under the License. -- -CREATE TABLE IF NOT EXISTS ts_kv ( - entity_id uuid NOT NULL, - key int NOT NULL, - ts bigint NOT NULL, - bool_v boolean, - str_v varchar(10000000), - long_v bigint, - dbl_v double precision, - json_v json +CREATE TABLE IF NOT EXISTS ts_kv +( + entity_id uuid NOT NULL, + key int NOT NULL, + ts bigint NOT NULL, + bool_v boolean, + str_v varchar(10000000), + long_v bigint, + dbl_v double precision, + json_v json ) PARTITION BY RANGE (ts); -CREATE TABLE IF NOT EXISTS ts_kv_latest ( - entity_id uuid NOT NULL, - key int NOT NULL, - ts bigint NOT NULL, - bool_v boolean, - str_v varchar(10000000), - long_v bigint, - dbl_v double precision, - json_v json, +CREATE TABLE IF NOT EXISTS ts_kv_latest +( + entity_id uuid NOT NULL, + key int NOT NULL, + ts bigint NOT NULL, + bool_v boolean, + str_v varchar(10000000), + long_v bigint, + dbl_v double precision, + json_v json, CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) ); -CREATE TABLE IF NOT EXISTS ts_kv_dictionary ( - key varchar(255) NOT NULL, +CREATE TABLE IF NOT EXISTS ts_kv_dictionary +( + key varchar(255) NOT NULL, key_id serial UNIQUE, CONSTRAINT ts_key_id_pkey PRIMARY KEY (key) -); \ No newline at end of file +); + +CREATE TABLE IF NOT EXISTS tb_schema_settings +( + schema_version bigint NOT NULL, + CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version) +); + +INSERT INTO tb_schema_settings (schema_version) VALUES (2005000); + +CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar, IN system_ttl bigint, INOUT deleted bigint) + LANGUAGE plpgsql AS +$$ +DECLARE + max_tenant_ttl bigint; + max_customer_ttl bigint; + max_ttl bigint; + date timestamp; + partition_by_max_ttl_date varchar; + partition_month varchar; + partition_day varchar; + partition_year varchar; + partition varchar; + partition_to_delete varchar; + + +BEGIN + SELECT max(attribute_kv.long_v) + FROM tenant + INNER JOIN attribute_kv ON tenant.id = attribute_kv.entity_id + WHERE attribute_kv.attribute_key = 'TTL' + into max_tenant_ttl; + SELECT max(attribute_kv.long_v) + FROM customer + INNER JOIN attribute_kv ON customer.id = attribute_kv.entity_id + WHERE attribute_kv.attribute_key = 'TTL' + into max_customer_ttl; + max_ttl := GREATEST(system_ttl, max_customer_ttl, max_tenant_ttl); + if max_ttl IS NOT NULL AND max_ttl > 0 THEN + date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - (max_ttl / 1000)); + partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date); + RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date; + IF partition_by_max_ttl_date IS NOT NULL THEN + CASE + WHEN partition_type = 'DAYS' THEN + partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); + partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5); + WHEN partition_type = 'MONTHS' THEN + partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); + ELSE + partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + END CASE; + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + LOOP + IF partition != partition_by_max_ttl_date THEN + IF partition_year IS NOT NULL THEN + IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN + partition_to_delete := partition; + ELSE + IF partition_month IS NOT NULL THEN + IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN + partition_to_delete := partition; + ELSE + IF partition_day IS NOT NULL THEN + IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN + partition_to_delete := partition; + END IF; + END IF; + END IF; + END IF; + END IF; + END IF; + END IF; + IF partition_to_delete IS NOT NULL THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete; + EXECUTE format('DROP TABLE %I', partition_to_delete); + deleted := deleted + 1; + END IF; + END LOOP; + END IF; + END IF; +END +$$; + +CREATE OR REPLACE FUNCTION get_partition_by_max_ttl_date(IN partition_type varchar, IN date timestamp, OUT partition varchar) AS +$$ +BEGIN + CASE + WHEN partition_type = 'DAYS' THEN + partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM') || '_' || to_char(date, 'dd'); + WHEN partition_type = 'MONTHS' THEN + partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM'); + WHEN partition_type = 'YEARS' THEN + partition := 'ts_kv_' || to_char(date, 'yyyy'); + WHEN partition_type = 'INDEFINITE' THEN + partition := NULL; + ELSE + partition := NULL; + END CASE; + IF partition IS NOT NULL THEN + IF NOT EXISTS(SELECT + FROM pg_tables + WHERE schemaname = 'public' + AND tablename = partition) THEN + partition := NULL; + RAISE NOTICE 'Failed to found partition by ttl'; + END IF; + END IF; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION to_uuid(IN entity_id varchar, OUT uuid_id uuid) AS +$$ +BEGIN + uuid_id := substring(entity_id, 8, 8) || '-' || substring(entity_id, 4, 4) || '-1' || substring(entity_id, 1, 3) || + '-' || substring(entity_id, 16, 4) || '-' || substring(entity_id, 20, 12); +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION delete_device_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint, + OUT deleted bigint) AS +$$ +BEGIN + EXECUTE format( + 'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(device.id) as entity_id FROM device WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted', + tenant_id, customer_id, ttl) into deleted; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION delete_asset_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint, + OUT deleted bigint) AS +$$ +BEGIN + EXECUTE format( + 'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(asset.id) as entity_id FROM asset WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted', + tenant_id, customer_id, ttl) into deleted; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION delete_customer_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint, + OUT deleted bigint) AS +$$ +BEGIN + EXECUTE format( + 'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(customer.id) as entity_id FROM customer WHERE tenant_id = %L and id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted', + tenant_id, customer_id, ttl) into deleted; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE cleanup_timeseries_by_ttl(IN null_uuid varchar(31), + IN system_ttl bigint, INOUT deleted bigint) + LANGUAGE plpgsql AS +$$ +DECLARE + tenant_cursor CURSOR FOR select tenant.id as tenant_id + from tenant; + tenant_id_record varchar; + customer_id_record varchar; + tenant_ttl bigint; + customer_ttl bigint; + deleted_for_entities bigint; + tenant_ttl_ts bigint; + customer_ttl_ts bigint; +BEGIN + OPEN tenant_cursor; + FETCH tenant_cursor INTO tenant_id_record; + WHILE FOUND + LOOP + EXECUTE format( + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L', + tenant_id_record, 'TTL') INTO tenant_ttl; + if tenant_ttl IS NULL THEN + tenant_ttl := system_ttl; + END IF; + IF tenant_ttl > 0 THEN + tenant_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - tenant_ttl::bigint * 1000)::bigint; + deleted_for_entities := delete_device_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for devices where tenant_id = %', deleted_for_entities, tenant_id_record; + deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for assets where tenant_id = %', deleted_for_entities, tenant_id_record; + END IF; + FOR customer_id_record IN + SELECT customer.id AS customer_id FROM customer WHERE customer.tenant_id = tenant_id_record + LOOP + EXECUTE format( + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L', + customer_id_record, 'TTL') INTO customer_ttl; + IF customer_ttl IS NULL THEN + customer_ttl_ts := tenant_ttl_ts; + ELSE + IF customer_ttl > 0 THEN + customer_ttl_ts := + (EXTRACT(EPOCH FROM current_timestamp) * 1000 - + customer_ttl::bigint * 1000)::bigint; + END IF; + END IF; + IF customer_ttl_ts IS NOT NULL AND customer_ttl_ts > 0 THEN + deleted_for_entities := + delete_customer_records_from_ts_kv(tenant_id_record, customer_id_record, + customer_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for customer with id = % where tenant_id = %', deleted_for_entities, customer_id_record, tenant_id_record; + deleted_for_entities := + delete_device_records_from_ts_kv(tenant_id_record, customer_id_record, + customer_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for devices where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record; + deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, + customer_id_record, + customer_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for assets where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record; + END IF; + END LOOP; + FETCH tenant_cursor INTO tenant_id_record; + END LOOP; +END +$$; From 8133cf4623ee1253dada7d42bf85c6101d181cfe Mon Sep 17 00:00:00 2001 From: Dmytro Shvaika Date: Thu, 9 Apr 2020 16:44:18 +0300 Subject: [PATCH 2/2] add sendActivationEmail as request param (default -> true) to activate method in auth controller and login service --- .../thingsboard/server/controller/AuthController.java | 11 +++++++---- ui/src/app/api/login.service.js | 5 ++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/AuthController.java b/application/src/main/java/org/thingsboard/server/controller/AuthController.java index 0cb9b29501..44449eb3d6 100644 --- a/application/src/main/java/org/thingsboard/server/controller/AuthController.java +++ b/application/src/main/java/org/thingsboard/server/controller/AuthController.java @@ -199,6 +199,7 @@ public class AuthController extends BaseController { @ResponseBody public JsonNode activateUser( @RequestBody JsonNode activateRequest, + @RequestParam(required = false, defaultValue = "true") boolean sendActivationMail, HttpServletRequest request) throws ThingsboardException { try { String activateToken = activateRequest.get("activateToken").asText(); @@ -213,10 +214,12 @@ public class AuthController extends BaseController { String loginUrl = String.format("%s/login", baseUrl); String email = user.getEmail(); - try { - mailService.sendAccountActivatedEmail(loginUrl, email); - } catch (Exception e) { - log.info("Unable to send account activation email [{}]", e.getMessage()); + if (sendActivationMail) { + try { + mailService.sendAccountActivatedEmail(loginUrl, email); + } catch (Exception e) { + log.info("Unable to send account activation email [{}]", e.getMessage()); + } } JwtToken accessToken = tokenFactory.createAccessJwtToken(securityUser); diff --git a/ui/src/app/api/login.service.js b/ui/src/app/api/login.service.js index 2322c13b63..0707070ed5 100644 --- a/ui/src/app/api/login.service.js +++ b/ui/src/app/api/login.service.js @@ -85,9 +85,12 @@ function LoginService($http, $q) { return deferred.promise; } - function activate(activateToken, password) { + function activate(activateToken, password, sendActivationMail) { var deferred = $q.defer(); var url = '/api/noauth/activate'; + if(sendActivationMail === true || sendActivationMail === false) { + url += '?sendActivationMail=' + sendActivationMail; + } $http.post(url, {activateToken: activateToken, password: password}).then(function success(response) { deferred.resolve(response); }, function fail() {