Merge branch 'improvements/userActivate' into develop/3.0
This commit is contained in:
commit
089dfe5402
@ -0,0 +1,123 @@
|
||||
--
|
||||
-- Copyright © 2016-2020 The Thingsboard Authors
|
||||
--
|
||||
-- Licensed under the Apache License, Version 2.0 (the "License");
|
||||
-- you may not use this file except in compliance with the License.
|
||||
-- You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
--
|
||||
|
||||
CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar, IN system_ttl bigint, INOUT deleted bigint)
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
DECLARE
|
||||
max_tenant_ttl bigint;
|
||||
max_customer_ttl bigint;
|
||||
max_ttl bigint;
|
||||
date timestamp;
|
||||
partition_by_max_ttl_date varchar;
|
||||
partition_month varchar;
|
||||
partition_day varchar;
|
||||
partition_year varchar;
|
||||
partition varchar;
|
||||
partition_to_delete varchar;
|
||||
|
||||
|
||||
BEGIN
|
||||
SELECT max(attribute_kv.long_v)
|
||||
FROM tenant
|
||||
INNER JOIN attribute_kv ON tenant.id = attribute_kv.entity_id
|
||||
WHERE attribute_kv.attribute_key = 'TTL'
|
||||
into max_tenant_ttl;
|
||||
SELECT max(attribute_kv.long_v)
|
||||
FROM customer
|
||||
INNER JOIN attribute_kv ON customer.id = attribute_kv.entity_id
|
||||
WHERE attribute_kv.attribute_key = 'TTL'
|
||||
into max_customer_ttl;
|
||||
max_ttl := GREATEST(system_ttl, max_customer_ttl, max_tenant_ttl);
|
||||
if max_ttl IS NOT NULL AND max_ttl > 0 THEN
|
||||
date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - (max_ttl / 1000));
|
||||
partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date);
|
||||
RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date;
|
||||
IF partition_by_max_ttl_date IS NOT NULL THEN
|
||||
CASE
|
||||
WHEN partition_type = 'DAYS' THEN
|
||||
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
||||
partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
|
||||
partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
|
||||
WHEN partition_type = 'MONTHS' THEN
|
||||
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
||||
partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
|
||||
ELSE
|
||||
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
||||
END CASE;
|
||||
FOR partition IN SELECT tablename
|
||||
FROM pg_tables
|
||||
WHERE schemaname = 'public'
|
||||
AND tablename like 'ts_kv_' || '%'
|
||||
AND tablename != 'ts_kv_latest'
|
||||
AND tablename != 'ts_kv_dictionary'
|
||||
LOOP
|
||||
IF partition != partition_by_max_ttl_date THEN
|
||||
IF partition_year IS NOT NULL THEN
|
||||
IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN
|
||||
partition_to_delete := partition;
|
||||
ELSE
|
||||
IF partition_month IS NOT NULL THEN
|
||||
IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN
|
||||
partition_to_delete := partition;
|
||||
ELSE
|
||||
IF partition_day IS NOT NULL THEN
|
||||
IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN
|
||||
partition_to_delete := partition;
|
||||
END IF;
|
||||
END IF;
|
||||
END IF;
|
||||
END IF;
|
||||
END IF;
|
||||
END IF;
|
||||
END IF;
|
||||
IF partition_to_delete IS NOT NULL THEN
|
||||
RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete;
|
||||
EXECUTE format('DROP TABLE %I', partition_to_delete);
|
||||
deleted := deleted + 1;
|
||||
END IF;
|
||||
END LOOP;
|
||||
END IF;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION get_partition_by_max_ttl_date(IN partition_type varchar, IN date timestamp, OUT partition varchar) AS
|
||||
$$
|
||||
BEGIN
|
||||
CASE
|
||||
WHEN partition_type = 'DAYS' THEN
|
||||
partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM') || '_' || to_char(date, 'dd');
|
||||
WHEN partition_type = 'MONTHS' THEN
|
||||
partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM');
|
||||
WHEN partition_type = 'YEARS' THEN
|
||||
partition := 'ts_kv_' || to_char(date, 'yyyy');
|
||||
WHEN partition_type = 'INDEFINITE' THEN
|
||||
partition := NULL;
|
||||
ELSE
|
||||
partition := NULL;
|
||||
END CASE;
|
||||
IF partition IS NOT NULL THEN
|
||||
IF NOT EXISTS(SELECT
|
||||
FROM pg_tables
|
||||
WHERE schemaname = 'public'
|
||||
AND tablename = partition) THEN
|
||||
partition := NULL;
|
||||
RAISE NOTICE 'Failed to found partition by ttl';
|
||||
END IF;
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
125
application/src/main/data/upgrade/2.4.3/schema_update_ttl.sql
Normal file
125
application/src/main/data/upgrade/2.4.3/schema_update_ttl.sql
Normal file
@ -0,0 +1,125 @@
|
||||
--
|
||||
-- Copyright © 2016-2020 The Thingsboard Authors
|
||||
--
|
||||
-- Licensed under the Apache License, Version 2.0 (the "License");
|
||||
-- you may not use this file except in compliance with the License.
|
||||
-- You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
--
|
||||
|
||||
CREATE OR REPLACE FUNCTION to_uuid(IN entity_id varchar, OUT uuid_id uuid) AS
|
||||
$$
|
||||
BEGIN
|
||||
uuid_id := substring(entity_id, 8, 8) || '-' || substring(entity_id, 4, 4) || '-1' || substring(entity_id, 1, 3) ||
|
||||
'-' || substring(entity_id, 16, 4) || '-' || substring(entity_id, 20, 12);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION delete_device_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint,
|
||||
OUT deleted bigint) AS
|
||||
$$
|
||||
BEGIN
|
||||
EXECUTE format(
|
||||
'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(device.id) as entity_id FROM device WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted',
|
||||
tenant_id, customer_id, ttl) into deleted;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION delete_asset_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint,
|
||||
OUT deleted bigint) AS
|
||||
$$
|
||||
BEGIN
|
||||
EXECUTE format(
|
||||
'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(asset.id) as entity_id FROM asset WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted',
|
||||
tenant_id, customer_id, ttl) into deleted;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION delete_customer_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint,
|
||||
OUT deleted bigint) AS
|
||||
$$
|
||||
BEGIN
|
||||
EXECUTE format(
|
||||
'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(customer.id) as entity_id FROM customer WHERE tenant_id = %L and id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted',
|
||||
tenant_id, customer_id, ttl) into deleted;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE PROCEDURE cleanup_timeseries_by_ttl(IN null_uuid varchar(31),
|
||||
IN system_ttl bigint, INOUT deleted bigint)
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
DECLARE
|
||||
tenant_cursor CURSOR FOR select tenant.id as tenant_id
|
||||
from tenant;
|
||||
tenant_id_record varchar;
|
||||
customer_id_record varchar;
|
||||
tenant_ttl bigint;
|
||||
customer_ttl bigint;
|
||||
deleted_for_entities bigint;
|
||||
tenant_ttl_ts bigint;
|
||||
customer_ttl_ts bigint;
|
||||
BEGIN
|
||||
OPEN tenant_cursor;
|
||||
FETCH tenant_cursor INTO tenant_id_record;
|
||||
WHILE FOUND
|
||||
LOOP
|
||||
EXECUTE format(
|
||||
'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L',
|
||||
tenant_id_record, 'TTL') INTO tenant_ttl;
|
||||
if tenant_ttl IS NULL THEN
|
||||
tenant_ttl := system_ttl;
|
||||
END IF;
|
||||
IF tenant_ttl > 0 THEN
|
||||
tenant_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - tenant_ttl::bigint * 1000)::bigint;
|
||||
deleted_for_entities := delete_device_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for devices where tenant_id = %', deleted_for_entities, tenant_id_record;
|
||||
deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for assets where tenant_id = %', deleted_for_entities, tenant_id_record;
|
||||
END IF;
|
||||
FOR customer_id_record IN
|
||||
SELECT customer.id AS customer_id FROM customer WHERE customer.tenant_id = tenant_id_record
|
||||
LOOP
|
||||
EXECUTE format(
|
||||
'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L',
|
||||
customer_id_record, 'TTL') INTO customer_ttl;
|
||||
IF customer_ttl IS NULL THEN
|
||||
customer_ttl_ts := tenant_ttl_ts;
|
||||
ELSE
|
||||
IF customer_ttl > 0 THEN
|
||||
customer_ttl_ts :=
|
||||
(EXTRACT(EPOCH FROM current_timestamp) * 1000 -
|
||||
customer_ttl::bigint * 1000)::bigint;
|
||||
END IF;
|
||||
END IF;
|
||||
IF customer_ttl_ts IS NOT NULL AND customer_ttl_ts > 0 THEN
|
||||
deleted_for_entities :=
|
||||
delete_customer_records_from_ts_kv(tenant_id_record, customer_id_record,
|
||||
customer_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for customer with id = % where tenant_id = %', deleted_for_entities, customer_id_record, tenant_id_record;
|
||||
deleted_for_entities :=
|
||||
delete_device_records_from_ts_kv(tenant_id_record, customer_id_record,
|
||||
customer_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for devices where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record;
|
||||
deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record,
|
||||
customer_id_record,
|
||||
customer_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for assets where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record;
|
||||
END IF;
|
||||
END LOOP;
|
||||
FETCH tenant_cursor INTO tenant_id_record;
|
||||
END LOOP;
|
||||
END
|
||||
$$;
|
||||
@ -199,6 +199,7 @@ public class AuthController extends BaseController {
|
||||
@ResponseBody
|
||||
public JsonNode activateUser(
|
||||
@RequestBody JsonNode activateRequest,
|
||||
@RequestParam(required = false, defaultValue = "true") boolean sendActivationMail,
|
||||
HttpServletRequest request) throws ThingsboardException {
|
||||
try {
|
||||
String activateToken = activateRequest.get("activateToken").asText();
|
||||
@ -213,10 +214,12 @@ public class AuthController extends BaseController {
|
||||
String loginUrl = String.format("%s/login", baseUrl);
|
||||
String email = user.getEmail();
|
||||
|
||||
try {
|
||||
mailService.sendAccountActivatedEmail(loginUrl, email);
|
||||
} catch (Exception e) {
|
||||
log.info("Unable to send account activation email [{}]", e.getMessage());
|
||||
if (sendActivationMail) {
|
||||
try {
|
||||
mailService.sendAccountActivatedEmail(loginUrl, email);
|
||||
} catch (Exception e) {
|
||||
log.info("Unable to send account activation email [{}]", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
JwtToken accessToken = tokenFactory.createAccessJwtToken(securityUser);
|
||||
|
||||
@ -47,7 +47,7 @@ public abstract class AbstractSqlTsDatabaseUpgradeService {
|
||||
@Autowired
|
||||
protected InstallScripts installScripts;
|
||||
|
||||
protected abstract void loadSql(Connection conn);
|
||||
protected abstract void loadSql(Connection conn, String fileName);
|
||||
|
||||
protected void loadFunctions(Path sqlFile, Connection conn) throws Exception {
|
||||
String sql = new String(Files.readAllBytes(sqlFile), StandardCharsets.UTF_8);
|
||||
@ -70,6 +70,26 @@ public abstract class AbstractSqlTsDatabaseUpgradeService {
|
||||
return versionValid;
|
||||
}
|
||||
|
||||
protected boolean isOldSchema(Connection conn, long fromVersion) {
|
||||
boolean isOldSchema = true;
|
||||
try {
|
||||
Statement statement = conn.createStatement();
|
||||
statement.execute("CREATE TABLE IF NOT EXISTS tb_schema_settings ( schema_version bigint NOT NULL, CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version));");
|
||||
Thread.sleep(1000);
|
||||
ResultSet resultSet = statement.executeQuery("SELECT schema_version FROM tb_schema_settings;");
|
||||
if (resultSet.next()) {
|
||||
isOldSchema = resultSet.getLong(1) <= fromVersion;
|
||||
} else {
|
||||
resultSet.close();
|
||||
statement.execute("INSERT INTO tb_schema_settings (schema_version) VALUES (" + fromVersion + ")");
|
||||
}
|
||||
statement.close();
|
||||
} catch (InterruptedException | SQLException e) {
|
||||
log.info("Failed to check current PostgreSQL schema due to: {}", e.getMessage());
|
||||
}
|
||||
return isOldSchema;
|
||||
}
|
||||
|
||||
protected void executeQuery(Connection conn, String query) {
|
||||
try {
|
||||
Statement statement = conn.createStatement();
|
||||
@ -83,7 +103,7 @@ public abstract class AbstractSqlTsDatabaseUpgradeService {
|
||||
nextWarning = nextWarning.getNextWarning();
|
||||
}
|
||||
}
|
||||
Thread.sleep(5000);
|
||||
Thread.sleep(2000);
|
||||
log.info("Successfully executed query: {}", query);
|
||||
} catch (InterruptedException | SQLException e) {
|
||||
log.info("Failed to execute query: {} due to: {}", query, e.getMessage());
|
||||
|
||||
@ -34,6 +34,8 @@ import java.sql.DriverManager;
|
||||
public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeService implements DatabaseTsUpgradeService {
|
||||
|
||||
private static final String LOAD_FUNCTIONS_SQL = "schema_update_psql_ts.sql";
|
||||
private static final String LOAD_TTL_FUNCTIONS_SQL = "schema_update_ttl.sql";
|
||||
private static final String LOAD_DROP_PARTITIONS_FUNCTIONS_SQL = "schema_update_psql_drop_partitions.sql";
|
||||
|
||||
private static final String TS_KV_OLD = "ts_kv_old;";
|
||||
private static final String TS_KV_LATEST_OLD = "ts_kv_latest_old;";
|
||||
@ -76,30 +78,39 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
|
||||
throw new RuntimeException("PostgreSQL version should be at least more than 11, please upgrade your PostgreSQL and restart the script!");
|
||||
} else {
|
||||
log.info("PostgreSQL version is valid!");
|
||||
log.info("Load upgrade functions ...");
|
||||
loadSql(conn);
|
||||
log.info("Updating timeseries schema ...");
|
||||
executeQuery(conn, CALL_CREATE_PARTITION_TS_KV_TABLE);
|
||||
executeQuery(conn, CALL_CREATE_PARTITIONS);
|
||||
executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE);
|
||||
executeQuery(conn, CALL_INSERT_INTO_DICTIONARY);
|
||||
executeQuery(conn, CALL_INSERT_INTO_TS_KV);
|
||||
executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);
|
||||
executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST);
|
||||
if (isOldSchema(conn, 2004003)) {
|
||||
log.info("Load upgrade functions ...");
|
||||
loadSql(conn, LOAD_FUNCTIONS_SQL);
|
||||
log.info("Updating timeseries schema ...");
|
||||
executeQuery(conn, CALL_CREATE_PARTITION_TS_KV_TABLE);
|
||||
executeQuery(conn, CALL_CREATE_PARTITIONS);
|
||||
executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE);
|
||||
executeQuery(conn, CALL_INSERT_INTO_DICTIONARY);
|
||||
executeQuery(conn, CALL_INSERT_INTO_TS_KV);
|
||||
executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);
|
||||
executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST);
|
||||
|
||||
executeQuery(conn, DROP_TABLE_TS_KV_OLD);
|
||||
executeQuery(conn, DROP_TABLE_TS_KV_LATEST_OLD);
|
||||
executeQuery(conn, DROP_TABLE_TS_KV_OLD);
|
||||
executeQuery(conn, DROP_TABLE_TS_KV_LATEST_OLD);
|
||||
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_PARTITION_TS_KV_TABLE);
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_PARTITIONS);
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE);
|
||||
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY);
|
||||
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV);
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE);
|
||||
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST);
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_PARTITION_TS_KV_TABLE);
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_PARTITIONS);
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE);
|
||||
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY);
|
||||
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV);
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE);
|
||||
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST);
|
||||
|
||||
executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN json_v json;");
|
||||
executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN json_v json;");
|
||||
executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;");
|
||||
executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN IF NOT EXISTS json_v json;");
|
||||
}
|
||||
|
||||
log.info("Load TTL functions ...");
|
||||
loadSql(conn, LOAD_TTL_FUNCTIONS_SQL);
|
||||
log.info("Load Drop Partitions functions ...");
|
||||
loadSql(conn, LOAD_DROP_PARTITIONS_FUNCTIONS_SQL);
|
||||
|
||||
executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005000");
|
||||
|
||||
log.info("schema timeseries updated!");
|
||||
}
|
||||
@ -110,11 +121,12 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
|
||||
}
|
||||
}
|
||||
|
||||
protected void loadSql(Connection conn) {
|
||||
Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", LOAD_FUNCTIONS_SQL);
|
||||
@Override
|
||||
protected void loadSql(Connection conn, String fileName) {
|
||||
Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName);
|
||||
try {
|
||||
loadFunctions(schemaUpdateFile, conn);
|
||||
log.info("Upgrade functions successfully loaded!");
|
||||
log.info("Functions successfully loaded!");
|
||||
} catch (Exception e) {
|
||||
log.info("Failed to load PostgreSQL upgrade functions due to: {}", e.getMessage());
|
||||
}
|
||||
|
||||
@ -39,6 +39,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
|
||||
private long chunkTimeInterval;
|
||||
|
||||
private static final String LOAD_FUNCTIONS_SQL = "schema_update_timescale_ts.sql";
|
||||
private static final String LOAD_TTL_FUNCTIONS_SQL = "schema_update_ttl.sql";
|
||||
|
||||
private static final String TENANT_TS_KV_OLD_TABLE = "tenant_ts_kv_old;";
|
||||
|
||||
@ -79,31 +80,37 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
|
||||
throw new RuntimeException("PostgreSQL version should be at least more than 11, please upgrade your PostgreSQL and restart the script!");
|
||||
} else {
|
||||
log.info("PostgreSQL version is valid!");
|
||||
log.info("Load upgrade functions ...");
|
||||
loadSql(conn);
|
||||
log.info("Updating timescale schema ...");
|
||||
executeQuery(conn, CALL_CREATE_TS_KV_LATEST_TABLE);
|
||||
executeQuery(conn, CALL_CREATE_NEW_TENANT_TS_KV_TABLE);
|
||||
if (isOldSchema(conn, 2004003)) {
|
||||
log.info("Load upgrade functions ...");
|
||||
loadSql(conn, LOAD_FUNCTIONS_SQL);
|
||||
log.info("Updating timescale schema ...");
|
||||
executeQuery(conn, CALL_CREATE_TS_KV_LATEST_TABLE);
|
||||
executeQuery(conn, CALL_CREATE_NEW_TENANT_TS_KV_TABLE);
|
||||
|
||||
executeQuery(conn, "SELECT create_hypertable('ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);");
|
||||
executeQuery(conn, "SELECT create_hypertable('ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);");
|
||||
|
||||
executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE);
|
||||
executeQuery(conn, CALL_INSERT_INTO_DICTIONARY);
|
||||
executeQuery(conn, CALL_INSERT_INTO_TS_KV);
|
||||
executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST);
|
||||
executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE);
|
||||
executeQuery(conn, CALL_INSERT_INTO_DICTIONARY);
|
||||
executeQuery(conn, CALL_INSERT_INTO_TS_KV);
|
||||
executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST);
|
||||
|
||||
executeQuery(conn, DROP_OLD_TENANT_TS_KV_TABLE);
|
||||
executeQuery(conn, DROP_OLD_TENANT_TS_KV_TABLE);
|
||||
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_LATEST_TABLE);
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_TENANT_TS_KV_TABLE_COPY);
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE);
|
||||
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY);
|
||||
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TENANT_TS_KV);
|
||||
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST);
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_LATEST_TABLE);
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_TENANT_TS_KV_TABLE_COPY);
|
||||
executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE);
|
||||
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY);
|
||||
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TENANT_TS_KV);
|
||||
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST);
|
||||
|
||||
executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN json_v json;");
|
||||
executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN json_v json;");
|
||||
executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;");
|
||||
executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN IF NOT EXISTS json_v json;");
|
||||
}
|
||||
|
||||
log.info("Load TTL functions ...");
|
||||
loadSql(conn, LOAD_TTL_FUNCTIONS_SQL);
|
||||
|
||||
executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005000");
|
||||
log.info("schema timescale updated!");
|
||||
}
|
||||
}
|
||||
@ -113,13 +120,14 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
|
||||
}
|
||||
}
|
||||
|
||||
protected void loadSql(Connection conn) {
|
||||
Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", LOAD_FUNCTIONS_SQL);
|
||||
@Override
|
||||
protected void loadSql(Connection conn, String fileName) {
|
||||
Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName);
|
||||
try {
|
||||
loadFunctions(schemaUpdateFile, conn);
|
||||
log.info("Upgrade functions successfully loaded!");
|
||||
log.info("Functions successfully loaded!");
|
||||
} catch (Exception e) {
|
||||
log.info("Failed to load Timescale upgrade functions due to: {}", e.getMessage());
|
||||
log.info("Failed to load PostgreSQL upgrade functions due to: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,89 @@
|
||||
/**
|
||||
* Copyright © 2016-2020 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.ttl;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.thingsboard.server.dao.util.PsqlTsAnyDao;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.SQLWarning;
|
||||
import java.sql.Statement;
|
||||
|
||||
@PsqlTsAnyDao
|
||||
@Slf4j
|
||||
public abstract class AbstractTimeseriesCleanUpService {
|
||||
|
||||
@Value("${sql.ttl.ts_key_value_ttl}")
|
||||
protected long systemTtl;
|
||||
|
||||
@Value("${sql.ttl.enabled}")
|
||||
private boolean ttlTaskExecutionEnabled;
|
||||
|
||||
@Value("${spring.datasource.url}")
|
||||
private String dbUrl;
|
||||
|
||||
@Value("${spring.datasource.username}")
|
||||
private String dbUserName;
|
||||
|
||||
@Value("${spring.datasource.password}")
|
||||
private String dbPassword;
|
||||
|
||||
@Scheduled(initialDelayString = "${sql.ttl.execution_interval_ms}", fixedDelayString = "${sql.ttl.execution_interval_ms}")
|
||||
public void cleanUp() {
|
||||
if (ttlTaskExecutionEnabled) {
|
||||
try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
|
||||
doCleanUp(conn);
|
||||
} catch (SQLException e) {
|
||||
log.error("SQLException occurred during TTL task execution ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void doCleanUp(Connection connection);
|
||||
|
||||
protected long executeQuery(Connection conn, String query) {
|
||||
long removed = 0L;
|
||||
try {
|
||||
Statement statement = conn.createStatement();
|
||||
ResultSet resultSet = statement.executeQuery(query);
|
||||
getWarnings(statement);
|
||||
resultSet.next();
|
||||
removed = resultSet.getLong(1);
|
||||
log.debug("Successfully executed query: {}", query);
|
||||
} catch (SQLException e) {
|
||||
log.debug("Failed to execute query: {} due to: {}", query, e.getMessage());
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
private void getWarnings(Statement statement) throws SQLException {
|
||||
SQLWarning warnings = statement.getWarnings();
|
||||
if (warnings != null) {
|
||||
log.debug("{}", warnings.getMessage());
|
||||
SQLWarning nextWarning = warnings.getNextWarning();
|
||||
while (nextWarning != null) {
|
||||
log.debug("{}", nextWarning.getMessage());
|
||||
nextWarning = nextWarning.getNextWarning();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,41 @@
|
||||
/**
|
||||
* Copyright © 2016-2020 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.ttl;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.dao.model.ModelConstants;
|
||||
import org.thingsboard.server.dao.util.PsqlTsDao;
|
||||
|
||||
import java.sql.Connection;
|
||||
|
||||
@PsqlTsDao
|
||||
@Service
|
||||
@Slf4j
|
||||
public class PsqlTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService {
|
||||
|
||||
@Value("${sql.postgres.ts_key_value_partitioning}")
|
||||
private String partitionType;
|
||||
|
||||
@Override
|
||||
protected void doCleanUp(Connection connection) {
|
||||
long totalPartitionsRemoved = executeQuery(connection, "call drop_partitions_by_max_ttl('" + partitionType + "'," + systemTtl + ", 0);");
|
||||
log.info("Total partitions removed by TTL: [{}]", totalPartitionsRemoved);
|
||||
long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID_STR + "'," + systemTtl + ", 0);");
|
||||
log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,35 @@
|
||||
/**
|
||||
* Copyright © 2016-2020 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.ttl;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.dao.model.ModelConstants;
|
||||
import org.thingsboard.server.dao.util.TimescaleDBTsDao;
|
||||
|
||||
import java.sql.Connection;
|
||||
|
||||
@TimescaleDBTsDao
|
||||
@Service
|
||||
@Slf4j
|
||||
public class TimescaleTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService {
|
||||
|
||||
@Override
|
||||
protected void doCleanUp(Connection connection) {
|
||||
long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID_STR + "'," + systemTtl + ", 0);");
|
||||
log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved);
|
||||
}
|
||||
}
|
||||
@ -214,6 +214,10 @@ sql:
|
||||
timescale:
|
||||
# Specify Interval size for new data chunks storage.
|
||||
chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}"
|
||||
ttl:
|
||||
enabled: "${SQL_TTL_ENABLED:true}"
|
||||
execution_interval_ms: "${SQL_TTL_EXECUTION_INTERVAL:86400000}" # Number of miliseconds
|
||||
ts_key_value_ttl: "${SQL_TTL_TS_KEY_VALUE_TTL:0}" # Number of seconds
|
||||
|
||||
# Actor system parameters
|
||||
actors:
|
||||
|
||||
@ -0,0 +1,21 @@
|
||||
/**
|
||||
* Copyright © 2016-2020 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.util;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
|
||||
@ConditionalOnExpression("'${database.ts.type}'=='sql' && '${spring.jpa.database-platform}'=='org.hibernate.dialect.PostgreSQLDialect'")
|
||||
public @interface PsqlTsDao { }
|
||||
@ -45,3 +45,121 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest (
|
||||
json_v json,
|
||||
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tb_schema_settings
|
||||
(
|
||||
schema_version bigint NOT NULL,
|
||||
CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version)
|
||||
);
|
||||
|
||||
INSERT INTO tb_schema_settings (schema_version) VALUES (2005000);
|
||||
|
||||
CREATE OR REPLACE FUNCTION to_uuid(IN entity_id varchar, OUT uuid_id uuid) AS
|
||||
$$
|
||||
BEGIN
|
||||
uuid_id := substring(entity_id, 8, 8) || '-' || substring(entity_id, 4, 4) || '-1' || substring(entity_id, 1, 3) ||
|
||||
'-' || substring(entity_id, 16, 4) || '-' || substring(entity_id, 20, 12);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION delete_device_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint,
|
||||
OUT deleted bigint) AS
|
||||
$$
|
||||
BEGIN
|
||||
EXECUTE format(
|
||||
'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(device.id) as entity_id FROM device WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted',
|
||||
tenant_id, customer_id, ttl) into deleted;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION delete_asset_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint,
|
||||
OUT deleted bigint) AS
|
||||
$$
|
||||
BEGIN
|
||||
EXECUTE format(
|
||||
'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(asset.id) as entity_id FROM asset WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted',
|
||||
tenant_id, customer_id, ttl) into deleted;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION delete_customer_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint,
|
||||
OUT deleted bigint) AS
|
||||
$$
|
||||
BEGIN
|
||||
EXECUTE format(
|
||||
'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(customer.id) as entity_id FROM customer WHERE tenant_id = %L and id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted',
|
||||
tenant_id, customer_id, ttl) into deleted;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE PROCEDURE cleanup_timeseries_by_ttl(IN null_uuid varchar(31),
|
||||
IN system_ttl bigint, INOUT deleted bigint)
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
DECLARE
|
||||
tenant_cursor CURSOR FOR select tenant.id as tenant_id
|
||||
from tenant;
|
||||
tenant_id_record varchar;
|
||||
customer_id_record varchar;
|
||||
tenant_ttl bigint;
|
||||
customer_ttl bigint;
|
||||
deleted_for_entities bigint;
|
||||
tenant_ttl_ts bigint;
|
||||
customer_ttl_ts bigint;
|
||||
BEGIN
|
||||
OPEN tenant_cursor;
|
||||
FETCH tenant_cursor INTO tenant_id_record;
|
||||
WHILE FOUND
|
||||
LOOP
|
||||
EXECUTE format(
|
||||
'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L',
|
||||
tenant_id_record, 'TTL') INTO tenant_ttl;
|
||||
if tenant_ttl IS NULL THEN
|
||||
tenant_ttl := system_ttl;
|
||||
END IF;
|
||||
IF tenant_ttl > 0 THEN
|
||||
tenant_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - tenant_ttl::bigint * 1000)::bigint;
|
||||
deleted_for_entities := delete_device_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for devices where tenant_id = %', deleted_for_entities, tenant_id_record;
|
||||
deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for assets where tenant_id = %', deleted_for_entities, tenant_id_record;
|
||||
END IF;
|
||||
FOR customer_id_record IN
|
||||
SELECT customer.id AS customer_id FROM customer WHERE customer.tenant_id = tenant_id_record
|
||||
LOOP
|
||||
EXECUTE format(
|
||||
'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L',
|
||||
customer_id_record, 'TTL') INTO customer_ttl;
|
||||
IF customer_ttl IS NULL THEN
|
||||
customer_ttl_ts := tenant_ttl_ts;
|
||||
ELSE
|
||||
IF customer_ttl > 0 THEN
|
||||
customer_ttl_ts :=
|
||||
(EXTRACT(EPOCH FROM current_timestamp) * 1000 -
|
||||
customer_ttl::bigint * 1000)::bigint;
|
||||
END IF;
|
||||
END IF;
|
||||
IF customer_ttl_ts IS NOT NULL AND customer_ttl_ts > 0 THEN
|
||||
deleted_for_entities :=
|
||||
delete_customer_records_from_ts_kv(tenant_id_record, customer_id_record,
|
||||
customer_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for customer with id = % where tenant_id = %', deleted_for_entities, customer_id_record, tenant_id_record;
|
||||
deleted_for_entities :=
|
||||
delete_device_records_from_ts_kv(tenant_id_record, customer_id_record,
|
||||
customer_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for devices where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record;
|
||||
deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record,
|
||||
customer_id_record,
|
||||
customer_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for assets where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record;
|
||||
END IF;
|
||||
END LOOP;
|
||||
FETCH tenant_cursor INTO tenant_id_record;
|
||||
END LOOP;
|
||||
END
|
||||
$$;
|
||||
|
||||
@ -14,31 +14,260 @@
|
||||
-- limitations under the License.
|
||||
--
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ts_kv (
|
||||
entity_id uuid NOT NULL,
|
||||
key int NOT NULL,
|
||||
ts bigint NOT NULL,
|
||||
bool_v boolean,
|
||||
str_v varchar(10000000),
|
||||
long_v bigint,
|
||||
dbl_v double precision,
|
||||
json_v json
|
||||
CREATE TABLE IF NOT EXISTS ts_kv
|
||||
(
|
||||
entity_id uuid NOT NULL,
|
||||
key int NOT NULL,
|
||||
ts bigint NOT NULL,
|
||||
bool_v boolean,
|
||||
str_v varchar(10000000),
|
||||
long_v bigint,
|
||||
dbl_v double precision,
|
||||
json_v json
|
||||
) PARTITION BY RANGE (ts);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ts_kv_latest (
|
||||
entity_id uuid NOT NULL,
|
||||
key int NOT NULL,
|
||||
ts bigint NOT NULL,
|
||||
bool_v boolean,
|
||||
str_v varchar(10000000),
|
||||
long_v bigint,
|
||||
dbl_v double precision,
|
||||
json_v json,
|
||||
CREATE TABLE IF NOT EXISTS ts_kv_latest
|
||||
(
|
||||
entity_id uuid NOT NULL,
|
||||
key int NOT NULL,
|
||||
ts bigint NOT NULL,
|
||||
bool_v boolean,
|
||||
str_v varchar(10000000),
|
||||
long_v bigint,
|
||||
dbl_v double precision,
|
||||
json_v json,
|
||||
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ts_kv_dictionary (
|
||||
key varchar(255) NOT NULL,
|
||||
CREATE TABLE IF NOT EXISTS ts_kv_dictionary
|
||||
(
|
||||
key varchar(255) NOT NULL,
|
||||
key_id serial UNIQUE,
|
||||
CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tb_schema_settings
|
||||
(
|
||||
schema_version bigint NOT NULL,
|
||||
CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version)
|
||||
);
|
||||
|
||||
INSERT INTO tb_schema_settings (schema_version) VALUES (2005000);
|
||||
|
||||
CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar, IN system_ttl bigint, INOUT deleted bigint)
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
DECLARE
|
||||
max_tenant_ttl bigint;
|
||||
max_customer_ttl bigint;
|
||||
max_ttl bigint;
|
||||
date timestamp;
|
||||
partition_by_max_ttl_date varchar;
|
||||
partition_month varchar;
|
||||
partition_day varchar;
|
||||
partition_year varchar;
|
||||
partition varchar;
|
||||
partition_to_delete varchar;
|
||||
|
||||
|
||||
BEGIN
|
||||
SELECT max(attribute_kv.long_v)
|
||||
FROM tenant
|
||||
INNER JOIN attribute_kv ON tenant.id = attribute_kv.entity_id
|
||||
WHERE attribute_kv.attribute_key = 'TTL'
|
||||
into max_tenant_ttl;
|
||||
SELECT max(attribute_kv.long_v)
|
||||
FROM customer
|
||||
INNER JOIN attribute_kv ON customer.id = attribute_kv.entity_id
|
||||
WHERE attribute_kv.attribute_key = 'TTL'
|
||||
into max_customer_ttl;
|
||||
max_ttl := GREATEST(system_ttl, max_customer_ttl, max_tenant_ttl);
|
||||
if max_ttl IS NOT NULL AND max_ttl > 0 THEN
|
||||
date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - (max_ttl / 1000));
|
||||
partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date);
|
||||
RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date;
|
||||
IF partition_by_max_ttl_date IS NOT NULL THEN
|
||||
CASE
|
||||
WHEN partition_type = 'DAYS' THEN
|
||||
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
||||
partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
|
||||
partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
|
||||
WHEN partition_type = 'MONTHS' THEN
|
||||
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
||||
partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
|
||||
ELSE
|
||||
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
|
||||
END CASE;
|
||||
FOR partition IN SELECT tablename
|
||||
FROM pg_tables
|
||||
WHERE schemaname = 'public'
|
||||
AND tablename like 'ts_kv_' || '%'
|
||||
AND tablename != 'ts_kv_latest'
|
||||
AND tablename != 'ts_kv_dictionary'
|
||||
LOOP
|
||||
IF partition != partition_by_max_ttl_date THEN
|
||||
IF partition_year IS NOT NULL THEN
|
||||
IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN
|
||||
partition_to_delete := partition;
|
||||
ELSE
|
||||
IF partition_month IS NOT NULL THEN
|
||||
IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN
|
||||
partition_to_delete := partition;
|
||||
ELSE
|
||||
IF partition_day IS NOT NULL THEN
|
||||
IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN
|
||||
partition_to_delete := partition;
|
||||
END IF;
|
||||
END IF;
|
||||
END IF;
|
||||
END IF;
|
||||
END IF;
|
||||
END IF;
|
||||
END IF;
|
||||
IF partition_to_delete IS NOT NULL THEN
|
||||
RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete;
|
||||
EXECUTE format('DROP TABLE %I', partition_to_delete);
|
||||
deleted := deleted + 1;
|
||||
END IF;
|
||||
END LOOP;
|
||||
END IF;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION get_partition_by_max_ttl_date(IN partition_type varchar, IN date timestamp, OUT partition varchar) AS
|
||||
$$
|
||||
BEGIN
|
||||
CASE
|
||||
WHEN partition_type = 'DAYS' THEN
|
||||
partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM') || '_' || to_char(date, 'dd');
|
||||
WHEN partition_type = 'MONTHS' THEN
|
||||
partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM');
|
||||
WHEN partition_type = 'YEARS' THEN
|
||||
partition := 'ts_kv_' || to_char(date, 'yyyy');
|
||||
WHEN partition_type = 'INDEFINITE' THEN
|
||||
partition := NULL;
|
||||
ELSE
|
||||
partition := NULL;
|
||||
END CASE;
|
||||
IF partition IS NOT NULL THEN
|
||||
IF NOT EXISTS(SELECT
|
||||
FROM pg_tables
|
||||
WHERE schemaname = 'public'
|
||||
AND tablename = partition) THEN
|
||||
partition := NULL;
|
||||
RAISE NOTICE 'Failed to found partition by ttl';
|
||||
END IF;
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION to_uuid(IN entity_id varchar, OUT uuid_id uuid) AS
|
||||
$$
|
||||
BEGIN
|
||||
uuid_id := substring(entity_id, 8, 8) || '-' || substring(entity_id, 4, 4) || '-1' || substring(entity_id, 1, 3) ||
|
||||
'-' || substring(entity_id, 16, 4) || '-' || substring(entity_id, 20, 12);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION delete_device_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint,
|
||||
OUT deleted bigint) AS
|
||||
$$
|
||||
BEGIN
|
||||
EXECUTE format(
|
||||
'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(device.id) as entity_id FROM device WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted',
|
||||
tenant_id, customer_id, ttl) into deleted;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION delete_asset_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint,
|
||||
OUT deleted bigint) AS
|
||||
$$
|
||||
BEGIN
|
||||
EXECUTE format(
|
||||
'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(asset.id) as entity_id FROM asset WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted',
|
||||
tenant_id, customer_id, ttl) into deleted;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION delete_customer_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint,
|
||||
OUT deleted bigint) AS
|
||||
$$
|
||||
BEGIN
|
||||
EXECUTE format(
|
||||
'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(customer.id) as entity_id FROM customer WHERE tenant_id = %L and id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted',
|
||||
tenant_id, customer_id, ttl) into deleted;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE PROCEDURE cleanup_timeseries_by_ttl(IN null_uuid varchar(31),
|
||||
IN system_ttl bigint, INOUT deleted bigint)
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
DECLARE
|
||||
tenant_cursor CURSOR FOR select tenant.id as tenant_id
|
||||
from tenant;
|
||||
tenant_id_record varchar;
|
||||
customer_id_record varchar;
|
||||
tenant_ttl bigint;
|
||||
customer_ttl bigint;
|
||||
deleted_for_entities bigint;
|
||||
tenant_ttl_ts bigint;
|
||||
customer_ttl_ts bigint;
|
||||
BEGIN
|
||||
OPEN tenant_cursor;
|
||||
FETCH tenant_cursor INTO tenant_id_record;
|
||||
WHILE FOUND
|
||||
LOOP
|
||||
EXECUTE format(
|
||||
'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L',
|
||||
tenant_id_record, 'TTL') INTO tenant_ttl;
|
||||
if tenant_ttl IS NULL THEN
|
||||
tenant_ttl := system_ttl;
|
||||
END IF;
|
||||
IF tenant_ttl > 0 THEN
|
||||
tenant_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - tenant_ttl::bigint * 1000)::bigint;
|
||||
deleted_for_entities := delete_device_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for devices where tenant_id = %', deleted_for_entities, tenant_id_record;
|
||||
deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for assets where tenant_id = %', deleted_for_entities, tenant_id_record;
|
||||
END IF;
|
||||
FOR customer_id_record IN
|
||||
SELECT customer.id AS customer_id FROM customer WHERE customer.tenant_id = tenant_id_record
|
||||
LOOP
|
||||
EXECUTE format(
|
||||
'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L',
|
||||
customer_id_record, 'TTL') INTO customer_ttl;
|
||||
IF customer_ttl IS NULL THEN
|
||||
customer_ttl_ts := tenant_ttl_ts;
|
||||
ELSE
|
||||
IF customer_ttl > 0 THEN
|
||||
customer_ttl_ts :=
|
||||
(EXTRACT(EPOCH FROM current_timestamp) * 1000 -
|
||||
customer_ttl::bigint * 1000)::bigint;
|
||||
END IF;
|
||||
END IF;
|
||||
IF customer_ttl_ts IS NOT NULL AND customer_ttl_ts > 0 THEN
|
||||
deleted_for_entities :=
|
||||
delete_customer_records_from_ts_kv(tenant_id_record, customer_id_record,
|
||||
customer_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for customer with id = % where tenant_id = %', deleted_for_entities, customer_id_record, tenant_id_record;
|
||||
deleted_for_entities :=
|
||||
delete_device_records_from_ts_kv(tenant_id_record, customer_id_record,
|
||||
customer_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for devices where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record;
|
||||
deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record,
|
||||
customer_id_record,
|
||||
customer_ttl_ts);
|
||||
deleted := deleted + deleted_for_entities;
|
||||
RAISE NOTICE '% telemetry removed for assets where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record;
|
||||
END IF;
|
||||
END LOOP;
|
||||
FETCH tenant_cursor INTO tenant_id_record;
|
||||
END LOOP;
|
||||
END
|
||||
$$;
|
||||
|
||||
@ -85,9 +85,12 @@ function LoginService($http, $q) {
|
||||
return deferred.promise;
|
||||
}
|
||||
|
||||
function activate(activateToken, password) {
|
||||
function activate(activateToken, password, sendActivationMail) {
|
||||
var deferred = $q.defer();
|
||||
var url = '/api/noauth/activate';
|
||||
if(sendActivationMail === true || sendActivationMail === false) {
|
||||
url += '?sendActivationMail=' + sendActivationMail;
|
||||
}
|
||||
$http.post(url, {activateToken: activateToken, password: password}).then(function success(response) {
|
||||
deferred.resolve(response);
|
||||
}, function fail() {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user