From 188c3e5b636e981cc3534c74bd27fdaaf6173fcd Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Tue, 10 Mar 2020 17:49:00 +0200 Subject: [PATCH] Upgrade Sql Ts & Timescale improvements (#2495) * psql & timescale ts upgrade improved * fix typo * fix typo 2 * removed tenant_id from timescale db schema & upgade scipt logic --- .../upgrade/2.4.3/schema_update_psql_ts.sql | 92 ++++++------- .../2.4.3/schema_update_timescale_ts.sql | 125 ++++++++---------- .../AbstractSqlTsDatabaseUpgradeService.java | 73 +++------- .../install/PsqlTsDatabaseUpgradeService.java | 51 ++++--- .../TimescaleTsDatabaseSchemaService.java | 4 +- .../TimescaleTsDatabaseUpgradeService.java | 54 ++++---- .../dao/model/sql/AbstractTsKvEntity.java | 5 + .../model/sqlts/latest/TsKvLatestEntity.java | 4 - .../ts/TimescaleTsKvCompositeKey.java | 1 - .../timescale/ts/TimescaleTsKvEntity.java | 19 +-- .../server/dao/model/sqlts/ts/TsKvEntity.java | 4 - ...stractChunkedAggregationTimeseriesDao.java | 36 ++--- .../dao/sqlts/AbstractSqlTimeseriesDao.java | 18 +-- .../TimescaleInsertTsRepository.java | 43 +++--- .../timescale/AggregationRepository.java | 25 ++-- .../timescale/TimescaleTimeseriesDao.java | 45 +++---- .../timescale/TsKvTimescaleRepository.java | 10 +- .../resources/sql/schema-timescale-idx.sql | 17 --- .../main/resources/sql/schema-timescale.sql | 5 +- .../server/dao/SqlDaoServiceTestSuite.java | 2 +- .../sql/timescale/drop-all-tables.sql | 2 +- 21 files changed, 259 insertions(+), 376 deletions(-) delete mode 100644 dao/src/main/resources/sql/schema-timescale-idx.sql diff --git a/application/src/main/data/upgrade/2.4.3/schema_update_psql_ts.sql b/application/src/main/data/upgrade/2.4.3/schema_update_psql_ts.sql index 2d012336ab..3d17bbef2f 100644 --- a/application/src/main/data/upgrade/2.4.3/schema_update_psql_ts.sql +++ b/application/src/main/data/upgrade/2.4.3/schema_update_psql_ts.sql @@ -14,33 +14,27 @@ -- limitations under the License. -- --- select check_version(); +-- call check_version(); -CREATE OR REPLACE FUNCTION check_version() RETURNS boolean AS $$ +CREATE OR REPLACE PROCEDURE check_version(INOUT valid_version boolean) LANGUAGE plpgsql AS $BODY$ DECLARE current_version integer; - valid_version boolean; BEGIN RAISE NOTICE 'Check the current installed PostgreSQL version...'; SELECT current_setting('server_version_num') INTO current_version; - IF current_version < 100000 THEN - valid_version := FALSE; - ELSE - valid_version := TRUE; - END IF; - IF valid_version = FALSE THEN - RAISE NOTICE 'Postgres version should be at least more than 10!'; - ELSE + IF current_version > 110000 THEN RAISE NOTICE 'PostgreSQL version is valid!'; RAISE NOTICE 'Schema update started...'; + SELECT true INTO valid_version; + ELSE + RAISE NOTICE 'Postgres version should be at least more than 10!'; END IF; - RETURN valid_version; END; -$$ LANGUAGE 'plpgsql'; +$BODY$; --- select create_partition_ts_kv_table(); +-- call create_partition_ts_kv_table(); -CREATE OR REPLACE FUNCTION create_partition_ts_kv_table() RETURNS VOID AS $$ +CREATE OR REPLACE PROCEDURE create_partition_ts_kv_table() LANGUAGE plpgsql AS $$ BEGIN ALTER TABLE ts_kv @@ -57,11 +51,11 @@ BEGIN ALTER TABLE ts_kv ALTER COLUMN key TYPE integer USING key::integer; END; -$$ LANGUAGE 'plpgsql'; +$$; --- select create_new_ts_kv_latest_table(); +-- call create_new_ts_kv_latest_table(); -CREATE OR REPLACE FUNCTION create_new_ts_kv_latest_table() RETURNS VOID AS $$ +CREATE OR REPLACE PROCEDURE create_new_ts_kv_latest_table() LANGUAGE plpgsql AS $$ BEGIN ALTER TABLE ts_kv_latest @@ -81,13 +75,13 @@ BEGIN ALTER TABLE ts_kv_latest ADD CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key); END; -$$ LANGUAGE 'plpgsql'; +$$; --- select create_partitions(); +-- call create_partitions(); + +CREATE OR REPLACE PROCEDURE create_partitions() LANGUAGE plpgsql AS $$ -CREATE OR REPLACE FUNCTION create_partitions() RETURNS VOID AS -$$ DECLARE partition_date varchar; from_ts bigint; @@ -111,11 +105,11 @@ BEGIN CLOSE key_cursor; END; -$$ language 'plpgsql'; +$$; --- select create_ts_kv_dictionary_table(); +-- call create_ts_kv_dictionary_table(); -CREATE OR REPLACE FUNCTION create_ts_kv_dictionary_table() RETURNS VOID AS $$ +CREATE OR REPLACE PROCEDURE create_ts_kv_dictionary_table() LANGUAGE plpgsql AS $$ BEGIN CREATE TABLE IF NOT EXISTS ts_kv_dictionary @@ -125,12 +119,12 @@ BEGIN CONSTRAINT ts_key_id_pkey PRIMARY KEY (key) ); END; -$$ LANGUAGE 'plpgsql'; +$$; --- select insert_into_dictionary(); +-- call insert_into_dictionary(); + +CREATE OR REPLACE PROCEDURE insert_into_dictionary() LANGUAGE plpgsql AS $$ -CREATE OR REPLACE FUNCTION insert_into_dictionary() RETURNS VOID AS -$$ DECLARE insert_record RECORD; key_cursor CURSOR FOR SELECT DISTINCT key @@ -150,28 +144,27 @@ BEGIN END LOOP; CLOSE key_cursor; END; -$$ language 'plpgsql'; +$$; --- select insert_into_ts_kv(); +-- call insert_into_ts_kv(); -CREATE OR REPLACE FUNCTION insert_into_ts_kv() RETURNS void AS -$$ +CREATE OR REPLACE PROCEDURE insert_into_ts_kv() LANGUAGE plpgsql AS $$ DECLARE insert_size CONSTANT integer := 10000; insert_counter integer DEFAULT 0; insert_record RECORD; - insert_cursor CURSOR FOR SELECT CONCAT(first_part_uuid, '-', second_part_uuid, '-1', third_part_uuid, '-', fourth_part_uuid, '-', fifth_part_uuid)::uuid AS entity_id, + insert_cursor CURSOR FOR SELECT CONCAT(entity_id_uuid_first_part, '-', entity_id_uuid_second_part, '-1', entity_id_uuid_third_part, '-', entity_id_uuid_fourth_part, '-', entity_id_uuid_fifth_part)::uuid AS entity_id, ts_kv_records.key AS key, ts_kv_records.ts AS ts, ts_kv_records.bool_v AS bool_v, ts_kv_records.str_v AS str_v, ts_kv_records.long_v AS long_v, ts_kv_records.dbl_v AS dbl_v - FROM (SELECT SUBSTRING(entity_id, 8, 8) AS first_part_uuid, - SUBSTRING(entity_id, 4, 4) AS second_part_uuid, - SUBSTRING(entity_id, 1, 3) AS third_part_uuid, - SUBSTRING(entity_id, 16, 4) AS fourth_part_uuid, - SUBSTRING(entity_id, 20) AS fifth_part_uuid, + FROM (SELECT SUBSTRING(entity_id, 8, 8) AS entity_id_uuid_first_part, + SUBSTRING(entity_id, 4, 4) AS entity_id_uuid_second_part, + SUBSTRING(entity_id, 1, 3) AS entity_id_uuid_third_part, + SUBSTRING(entity_id, 16, 4) AS entity_id_uuid_fourth_part, + SUBSTRING(entity_id, 20) AS entity_id_uuid_fifth_part, key_id AS key, ts, bool_v, @@ -198,28 +191,27 @@ BEGIN END LOOP; CLOSE insert_cursor; END; -$$ LANGUAGE 'plpgsql'; +$$; --- select insert_into_ts_kv_latest(); +-- call insert_into_ts_kv_latest(); -CREATE OR REPLACE FUNCTION insert_into_ts_kv_latest() RETURNS void AS -$$ +CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest() LANGUAGE plpgsql AS $$ DECLARE insert_size CONSTANT integer := 10000; insert_counter integer DEFAULT 0; insert_record RECORD; - insert_cursor CURSOR FOR SELECT CONCAT(first_part_uuid, '-', second_part_uuid, '-1', third_part_uuid, '-', fourth_part_uuid, '-', fifth_part_uuid)::uuid AS entity_id, + insert_cursor CURSOR FOR SELECT CONCAT(entity_id_uuid_first_part, '-', entity_id_uuid_second_part, '-1', entity_id_uuid_third_part, '-', entity_id_uuid_fourth_part, '-', entity_id_uuid_fifth_part)::uuid AS entity_id, ts_kv_latest_records.key AS key, ts_kv_latest_records.ts AS ts, ts_kv_latest_records.bool_v AS bool_v, ts_kv_latest_records.str_v AS str_v, ts_kv_latest_records.long_v AS long_v, ts_kv_latest_records.dbl_v AS dbl_v - FROM (SELECT SUBSTRING(entity_id, 8, 8) AS first_part_uuid, - SUBSTRING(entity_id, 4, 4) AS second_part_uuid, - SUBSTRING(entity_id, 1, 3) AS third_part_uuid, - SUBSTRING(entity_id, 16, 4) AS fourth_part_uuid, - SUBSTRING(entity_id, 20) AS fifth_part_uuid, + FROM (SELECT SUBSTRING(entity_id, 8, 8) AS entity_id_uuid_first_part, + SUBSTRING(entity_id, 4, 4) AS entity_id_uuid_second_part, + SUBSTRING(entity_id, 1, 3) AS entity_id_uuid_third_part, + SUBSTRING(entity_id, 16, 4) AS entity_id_uuid_fourth_part, + SUBSTRING(entity_id, 20) AS entity_id_uuid_fifth_part, key_id AS key, ts, bool_v, @@ -246,6 +238,6 @@ BEGIN END LOOP; CLOSE insert_cursor; END; -$$ LANGUAGE 'plpgsql'; +$$; diff --git a/application/src/main/data/upgrade/2.4.3/schema_update_timescale_ts.sql b/application/src/main/data/upgrade/2.4.3/schema_update_timescale_ts.sql index b8a3f1850e..ebbc6933ae 100644 --- a/application/src/main/data/upgrade/2.4.3/schema_update_timescale_ts.sql +++ b/application/src/main/data/upgrade/2.4.3/schema_update_timescale_ts.sql @@ -14,60 +14,51 @@ -- limitations under the License. -- --- select check_version(); +-- call check_version(); + +CREATE OR REPLACE PROCEDURE check_version(INOUT valid_version boolean) LANGUAGE plpgsql AS $BODY$ -CREATE OR REPLACE FUNCTION check_version() RETURNS boolean AS $$ DECLARE current_version integer; - valid_version boolean; BEGIN RAISE NOTICE 'Check the current installed PostgreSQL version...'; SELECT current_setting('server_version_num') INTO current_version; - IF current_version < 90600 THEN - valid_version := FALSE; - ELSE - valid_version := TRUE; - END IF; - IF valid_version = FALSE THEN - RAISE NOTICE 'Postgres version should be at least more than 9.6!'; - ELSE + IF current_version > 110000 THEN RAISE NOTICE 'PostgreSQL version is valid!'; RAISE NOTICE 'Schema update started...'; + SELECT true INTO valid_version; + ELSE + RAISE NOTICE 'Postgres version should be at least more than 10!'; END IF; - RETURN valid_version; END; -$$ LANGUAGE 'plpgsql'; +$BODY$; --- select create_new_tenant_ts_kv_table(); +-- call create_new_ts_kv_table(); -CREATE OR REPLACE FUNCTION create_new_tenant_ts_kv_table() RETURNS VOID AS $$ +CREATE OR REPLACE PROCEDURE create_new_ts_kv_table() LANGUAGE plpgsql AS $$ BEGIN ALTER TABLE tenant_ts_kv RENAME TO tenant_ts_kv_old; - CREATE TABLE IF NOT EXISTS tenant_ts_kv + CREATE TABLE IF NOT EXISTS ts_kv ( LIKE tenant_ts_kv_old ); - ALTER TABLE tenant_ts_kv - ALTER COLUMN tenant_id TYPE uuid USING tenant_id::uuid; - ALTER TABLE tenant_ts_kv - ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid; - ALTER TABLE tenant_ts_kv - ALTER COLUMN key TYPE integer USING key::integer; - ALTER TABLE tenant_ts_kv - ADD CONSTRAINT tenant_ts_kv_pkey PRIMARY KEY(tenant_id, entity_id, key, ts); + ALTER TABLE ts_kv ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid; + ALTER TABLE ts_kv ALTER COLUMN key TYPE integer USING key::integer; + ALTER INDEX ts_kv_pkey RENAME TO tenant_ts_kv_pkey_old; ALTER INDEX idx_tenant_ts_kv RENAME TO idx_tenant_ts_kv_old; ALTER INDEX tenant_ts_kv_ts_idx RENAME TO tenant_ts_kv_ts_idx_old; --- PERFORM create_hypertable('tenant_ts_kv', 'ts', chunk_time_interval => 86400000, if_not_exists => true); - CREATE INDEX IF NOT EXISTS idx_tenant_ts_kv ON tenant_ts_kv(tenant_id, entity_id, key, ts); + ALTER TABLE ts_kv ADD CONSTRAINT ts_kv_pkey PRIMARY KEY(entity_id, key, ts); +-- CREATE INDEX IF NOT EXISTS ts_kv_ts_idx ON ts_kv(ts DESC); + ALTER TABLE ts_kv DROP COLUMN IF EXISTS tenant_id; END; -$$ LANGUAGE 'plpgsql'; +$$; --- select create_ts_kv_latest_table(); +-- call create_ts_kv_latest_table(); -CREATE OR REPLACE FUNCTION create_ts_kv_latest_table() RETURNS VOID AS $$ +CREATE OR REPLACE PROCEDURE create_ts_kv_latest_table() LANGUAGE plpgsql AS $$ BEGIN CREATE TABLE IF NOT EXISTS ts_kv_latest @@ -82,12 +73,12 @@ BEGIN CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) ); END; -$$ LANGUAGE 'plpgsql'; +$$; --- select create_ts_kv_dictionary_table(); +-- call create_ts_kv_dictionary_table(); -CREATE OR REPLACE FUNCTION create_ts_kv_dictionary_table() RETURNS VOID AS $$ +CREATE OR REPLACE PROCEDURE create_ts_kv_dictionary_table() LANGUAGE plpgsql AS $$ BEGIN CREATE TABLE IF NOT EXISTS ts_kv_dictionary @@ -97,12 +88,12 @@ BEGIN CONSTRAINT ts_key_id_pkey PRIMARY KEY (key) ); END; -$$ LANGUAGE 'plpgsql'; +$$; --- select insert_into_dictionary(); +-- call insert_into_dictionary(); + +CREATE OR REPLACE PROCEDURE insert_into_dictionary() LANGUAGE plpgsql AS $$ -CREATE OR REPLACE FUNCTION insert_into_dictionary() RETURNS VOID AS -$$ DECLARE insert_record RECORD; key_cursor CURSOR FOR SELECT DISTINCT key @@ -122,34 +113,28 @@ BEGIN END LOOP; CLOSE key_cursor; END; -$$ language 'plpgsql'; +$$; --- select insert_into_tenant_ts_kv(); +-- call insert_into_ts_kv(); + +CREATE OR REPLACE PROCEDURE insert_into_ts_kv() LANGUAGE plpgsql AS $$ -CREATE OR REPLACE FUNCTION insert_into_tenant_ts_kv() RETURNS void AS -$$ DECLARE insert_size CONSTANT integer := 10000; insert_counter integer DEFAULT 0; insert_record RECORD; - insert_cursor CURSOR FOR SELECT CONCAT(tenant_id_first_part_uuid, '-', tenant_id_second_part_uuid, '-1', tenant_id_third_part_uuid, '-', tenant_id_fourth_part_uuid, '-', tenant_id_fifth_part_uuid)::uuid AS tenant_id, - CONCAT(entity_id_first_part_uuid, '-', entity_id_second_part_uuid, '-1', entity_id_third_part_uuid, '-', entity_id_fourth_part_uuid, '-', entity_id_fifth_part_uuid)::uuid AS entity_id, - tenant_ts_kv_records.key AS key, - tenant_ts_kv_records.ts AS ts, - tenant_ts_kv_records.bool_v AS bool_v, - tenant_ts_kv_records.str_v AS str_v, - tenant_ts_kv_records.long_v AS long_v, - tenant_ts_kv_records.dbl_v AS dbl_v - FROM (SELECT SUBSTRING(tenant_id, 8, 8) AS tenant_id_first_part_uuid, - SUBSTRING(tenant_id, 4, 4) AS tenant_id_second_part_uuid, - SUBSTRING(tenant_id, 1, 3) AS tenant_id_third_part_uuid, - SUBSTRING(tenant_id, 16, 4) AS tenant_id_fourth_part_uuid, - SUBSTRING(tenant_id, 20) AS tenant_id_fifth_part_uuid, - SUBSTRING(entity_id, 8, 8) AS entity_id_first_part_uuid, - SUBSTRING(entity_id, 4, 4) AS entity_id_second_part_uuid, - SUBSTRING(entity_id, 1, 3) AS entity_id_third_part_uuid, - SUBSTRING(entity_id, 16, 4) AS entity_id_fourth_part_uuid, - SUBSTRING(entity_id, 20) AS entity_id_fifth_part_uuid, + insert_cursor CURSOR FOR SELECT CONCAT(entity_id_uuid_first_part, '-', entity_id_uuid_second_part, '-1', entity_id_uuid_third_part, '-', entity_id_uuid_fourth_part, '-', entity_id_uuid_fifth_part)::uuid AS entity_id, + new_ts_kv_records.key AS key, + new_ts_kv_records.ts AS ts, + new_ts_kv_records.bool_v AS bool_v, + new_ts_kv_records.str_v AS str_v, + new_ts_kv_records.long_v AS long_v, + new_ts_kv_records.dbl_v AS dbl_v + FROM (SELECT SUBSTRING(entity_id, 8, 8) AS entity_id_uuid_first_part, + SUBSTRING(entity_id, 4, 4) AS entity_id_uuid_second_part, + SUBSTRING(entity_id, 1, 3) AS entity_id_uuid_third_part, + SUBSTRING(entity_id, 16, 4) AS entity_id_uuid_fourth_part, + SUBSTRING(entity_id, 20) AS entity_id_uuid_fifth_part, key_id AS key, ts, bool_v, @@ -157,31 +142,31 @@ DECLARE long_v, dbl_v FROM tenant_ts_kv_old - INNER JOIN ts_kv_dictionary ON (tenant_ts_kv_old.key = ts_kv_dictionary.key)) AS tenant_ts_kv_records; + INNER JOIN ts_kv_dictionary ON (tenant_ts_kv_old.key = ts_kv_dictionary.key)) AS new_ts_kv_records; BEGIN OPEN insert_cursor; LOOP insert_counter := insert_counter + 1; FETCH insert_cursor INTO insert_record; IF NOT FOUND THEN - RAISE NOTICE '% records have been inserted into the new tenant_ts_kv table!',insert_counter - 1; + RAISE NOTICE '% records have been inserted into the new ts_kv table!',insert_counter - 1; EXIT; END IF; - INSERT INTO tenant_ts_kv(tenant_id, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) - VALUES (insert_record.tenant_id, insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v, + INSERT INTO ts_kv(entity_id, key, ts, bool_v, str_v, long_v, dbl_v) + VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v, insert_record.long_v, insert_record.dbl_v); IF MOD(insert_counter, insert_size) = 0 THEN - RAISE NOTICE '% records have been inserted into the new tenant_ts_kv table!',insert_counter; + RAISE NOTICE '% records have been inserted into the new ts_kv table!',insert_counter; END IF; END LOOP; CLOSE insert_cursor; END; -$$ LANGUAGE 'plpgsql'; +$$; --- select insert_into_ts_kv_latest(); +-- call insert_into_ts_kv_latest(); + +CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest() LANGUAGE plpgsql AS $$ -CREATE OR REPLACE FUNCTION insert_into_ts_kv_latest() RETURNS void AS -$$ DECLARE insert_size CONSTANT integer := 10000; insert_counter integer DEFAULT 0; @@ -191,7 +176,7 @@ DECLARE latest_records.key AS key, latest_records.entity_id AS entity_id, latest_records.ts AS ts - FROM (SELECT DISTINCT key AS key, entity_id AS entity_id, MAX(ts) AS ts FROM tenant_ts_kv GROUP BY key, entity_id) AS latest_records; + FROM (SELECT DISTINCT key AS key, entity_id AS entity_id, MAX(ts) AS ts FROM ts_kv GROUP BY key, entity_id) AS latest_records; BEGIN OPEN insert_cursor; LOOP @@ -201,7 +186,7 @@ BEGIN RAISE NOTICE '% records have been inserted into the ts_kv_latest table!',insert_counter - 1; EXIT; END IF; - SELECT entity_id AS entity_id, key AS key, ts AS ts, bool_v AS bool_v, str_v AS str_v, long_v AS long_v, dbl_v AS dbl_v INTO insert_record FROM tenant_ts_kv WHERE entity_id = latest_record.entity_id AND key = latest_record.key AND ts = latest_record.ts; + SELECT entity_id AS entity_id, key AS key, ts AS ts, bool_v AS bool_v, str_v AS str_v, long_v AS long_v, dbl_v AS dbl_v INTO insert_record FROM ts_kv WHERE entity_id = latest_record.entity_id AND key = latest_record.key AND ts = latest_record.ts; INSERT INTO ts_kv_latest(entity_id, key, ts, bool_v, str_v, long_v, dbl_v) VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v, insert_record.long_v, insert_record.dbl_v); IF MOD(insert_counter, insert_size) = 0 THEN @@ -210,4 +195,4 @@ BEGIN END LOOP; CLOSE insert_cursor; END; -$$ LANGUAGE 'plpgsql'; +$$; diff --git a/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java index fe56ac129c..901f773515 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java @@ -22,38 +22,21 @@ import org.springframework.beans.factory.annotation.Value; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.sql.CallableStatement; import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLWarning; -import java.sql.Types; +import java.sql.Statement; @Slf4j public abstract class AbstractSqlTsDatabaseUpgradeService { protected static final String CALL_REGEX = "call "; - protected static final String CHECK_VERSION = "check_version()"; + protected static final String CHECK_VERSION = "check_version(false)"; + protected static final String CHECK_VERSION_TO_DELETE = "check_version(INOUT valid_version boolean)"; protected static final String DROP_TABLE = "DROP TABLE "; - protected static final String DROP_FUNCTION_IF_EXISTS = "DROP FUNCTION IF EXISTS "; - - private static final String CALL_CHECK_VERSION = CALL_REGEX + CHECK_VERSION; - - - private static final String FUNCTION = "function: {}"; - private static final String DROP_STATEMENT = "drop statement: {}"; - private static final String QUERY = "query: {}"; - private static final String SUCCESSFULLY_EXECUTED = "Successfully executed "; - private static final String FAILED_TO_EXECUTE = "Failed to execute "; - private static final String FAILED_DUE_TO = " due to: {}"; - - protected static final String SUCCESSFULLY_EXECUTED_FUNCTION = SUCCESSFULLY_EXECUTED + FUNCTION; - protected static final String FAILED_TO_EXECUTE_FUNCTION_DUE_TO = FAILED_TO_EXECUTE + FUNCTION + FAILED_DUE_TO; - - protected static final String SUCCESSFULLY_EXECUTED_DROP_STATEMENT = SUCCESSFULLY_EXECUTED + DROP_STATEMENT; - protected static final String FAILED_TO_EXECUTE_DROP_STATEMENT = FAILED_TO_EXECUTE + DROP_STATEMENT + FAILED_DUE_TO; - - protected static final String SUCCESSFULLY_EXECUTED_QUERY = SUCCESSFULLY_EXECUTED + QUERY; - protected static final String FAILED_TO_EXECUTE_QUERY = FAILED_TO_EXECUTE + QUERY + FAILED_DUE_TO; + protected static final String DROP_PROCEDURE_IF_EXISTS = "DROP PROCEDURE IF EXISTS "; + protected static final String DROP_PROCEDURE_CHECK_VERSION = DROP_PROCEDURE_IF_EXISTS + CHECK_VERSION_TO_DELETE; @Value("${spring.datasource.url}") protected String dbUrl; @@ -78,23 +61,22 @@ public abstract class AbstractSqlTsDatabaseUpgradeService { log.info("Check the current PostgreSQL version..."); boolean versionValid = false; try { - CallableStatement callableStatement = conn.prepareCall("{? = " + CALL_CHECK_VERSION + " }"); - callableStatement.registerOutParameter(1, Types.BOOLEAN); - callableStatement.execute(); - versionValid = callableStatement.getBoolean(1); - callableStatement.close(); + Statement statement = conn.createStatement(); + ResultSet resultSet = statement.executeQuery(CALL_REGEX + CHECK_VERSION); + resultSet.next(); + versionValid = resultSet.getBoolean(1); + statement.close(); } catch (Exception e) { log.info("Failed to check current PostgreSQL version due to: {}", e.getMessage()); } return versionValid; } - protected void executeFunction(Connection conn, String query) { - log.info("{} ... ", query); + protected void executeQuery(Connection conn, String query) { try { - CallableStatement callableStatement = conn.prepareCall("{" + query + "}"); - callableStatement.execute(); - SQLWarning warnings = callableStatement.getWarnings(); + Statement statement = conn.createStatement(); + statement.execute(query); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script + SQLWarning warnings = statement.getWarnings(); if (warnings != null) { log.info("{}", warnings.getMessage()); SQLWarning nextWarning = warnings.getNextWarning(); @@ -103,31 +85,10 @@ public abstract class AbstractSqlTsDatabaseUpgradeService { nextWarning = nextWarning.getNextWarning(); } } - callableStatement.close(); - log.info(SUCCESSFULLY_EXECUTED_FUNCTION, query.replace(CALL_REGEX, "")); - Thread.sleep(2000); - } catch (Exception e) { - log.info(FAILED_TO_EXECUTE_FUNCTION_DUE_TO, query, e.getMessage()); - } - } - - protected void executeDropStatement(Connection conn, String query) { - try { - conn.createStatement().execute(query); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script - log.info(SUCCESSFULLY_EXECUTED_DROP_STATEMENT, query); Thread.sleep(5000); + log.info("Successfully executed query: {}", query); } catch (InterruptedException | SQLException e) { - log.info(FAILED_TO_EXECUTE_DROP_STATEMENT, query, e.getMessage()); - } - } - - protected void executeQuery(Connection conn, String query) { - try { - conn.createStatement().execute(query); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script - log.info(SUCCESSFULLY_EXECUTED_QUERY, query); - Thread.sleep(5000); - } catch (InterruptedException | SQLException e) { - log.info(FAILED_TO_EXECUTE_QUERY, query, e.getMessage()); + log.info("Failed to execute query: {} due to: {}", query, e.getMessage()); } } diff --git a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java index eb951ed9ae..96f2c126a0 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java @@ -57,14 +57,13 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe private static final String DROP_TABLE_TS_KV_OLD = DROP_TABLE + TS_KV_OLD; private static final String DROP_TABLE_TS_KV_LATEST_OLD = DROP_TABLE + TS_KV_LATEST_OLD; - private static final String DROP_FUNCTION_CHECK_VERSION = DROP_FUNCTION_IF_EXISTS + CHECK_VERSION; - private static final String DROP_FUNCTION_CREATE_PARTITION_TS_KV_TABLE = DROP_FUNCTION_IF_EXISTS + CREATE_PARTITION_TS_KV_TABLE; - private static final String DROP_FUNCTION_CREATE_NEW_TS_KV_LATEST_TABLE = DROP_FUNCTION_IF_EXISTS + CREATE_NEW_TS_KV_LATEST_TABLE; - private static final String DROP_FUNCTION_CREATE_PARTITIONS = DROP_FUNCTION_IF_EXISTS + CREATE_PARTITIONS; - private static final String DROP_FUNCTION_CREATE_TS_KV_DICTIONARY_TABLE = DROP_FUNCTION_IF_EXISTS + CREATE_TS_KV_DICTIONARY_TABLE; - private static final String DROP_FUNCTION_INSERT_INTO_DICTIONARY = DROP_FUNCTION_IF_EXISTS + INSERT_INTO_DICTIONARY; - private static final String DROP_FUNCTION_INSERT_INTO_TS_KV = DROP_FUNCTION_IF_EXISTS + INSERT_INTO_TS_KV; - private static final String DROP_FUNCTION_INSERT_INTO_TS_KV_LATEST = DROP_FUNCTION_IF_EXISTS + INSERT_INTO_TS_KV_LATEST; + private static final String DROP_PROCEDURE_CREATE_PARTITION_TS_KV_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_PARTITION_TS_KV_TABLE; + private static final String DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_NEW_TS_KV_LATEST_TABLE; + private static final String DROP_PROCEDURE_CREATE_PARTITIONS = DROP_PROCEDURE_IF_EXISTS + CREATE_PARTITIONS; + private static final String DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_TS_KV_DICTIONARY_TABLE; + private static final String DROP_PROCEDURE_INSERT_INTO_DICTIONARY = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_DICTIONARY; + private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV; + private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST; @Override public void upgradeDatabase(String fromVersion) throws Exception { @@ -76,30 +75,30 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe loadSql(conn); boolean versionValid = checkVersion(conn); if (!versionValid) { - log.info("PostgreSQL version should be at least more than 10!"); + log.info("PostgreSQL version should be at least more than 11!"); log.info("Please upgrade your PostgreSQL and restart the script!"); } else { log.info("PostgreSQL version is valid!"); log.info("Updating schema ..."); - executeFunction(conn, CALL_CREATE_PARTITION_TS_KV_TABLE); - executeFunction(conn, CALL_CREATE_PARTITIONS); - executeFunction(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE); - executeFunction(conn, CALL_INSERT_INTO_DICTIONARY); - executeFunction(conn, CALL_INSERT_INTO_TS_KV); - executeFunction(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE); - executeFunction(conn, CALL_INSERT_INTO_TS_KV_LATEST); + executeQuery(conn, CALL_CREATE_PARTITION_TS_KV_TABLE); + executeQuery(conn, CALL_CREATE_PARTITIONS); + executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE); + executeQuery(conn, CALL_INSERT_INTO_DICTIONARY); + executeQuery(conn, CALL_INSERT_INTO_TS_KV); + executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE); + executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST); - executeDropStatement(conn, DROP_TABLE_TS_KV_OLD); - executeDropStatement(conn, DROP_TABLE_TS_KV_LATEST_OLD); + executeQuery(conn, DROP_TABLE_TS_KV_OLD); + executeQuery(conn, DROP_TABLE_TS_KV_LATEST_OLD); - executeDropStatement(conn, DROP_FUNCTION_CHECK_VERSION); - executeDropStatement(conn, DROP_FUNCTION_CREATE_PARTITION_TS_KV_TABLE); - executeDropStatement(conn, DROP_FUNCTION_CREATE_PARTITIONS); - executeDropStatement(conn, DROP_FUNCTION_CREATE_TS_KV_DICTIONARY_TABLE); - executeDropStatement(conn, DROP_FUNCTION_INSERT_INTO_DICTIONARY); - executeDropStatement(conn, DROP_FUNCTION_INSERT_INTO_TS_KV); - executeDropStatement(conn, DROP_FUNCTION_CREATE_NEW_TS_KV_LATEST_TABLE); - executeDropStatement(conn, DROP_FUNCTION_INSERT_INTO_TS_KV_LATEST); + executeQuery(conn, DROP_PROCEDURE_CHECK_VERSION); + executeQuery(conn, DROP_PROCEDURE_CREATE_PARTITION_TS_KV_TABLE); + executeQuery(conn, DROP_PROCEDURE_CREATE_PARTITIONS); + executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV); + executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN json_v json;"); executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN json_v json;"); diff --git a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseSchemaService.java b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseSchemaService.java index 92a0a837fa..e8c542d956 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseSchemaService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseSchemaService.java @@ -45,13 +45,13 @@ public class TimescaleTsDatabaseSchemaService extends SqlAbstractDatabaseSchemaS private long chunkTimeInterval; public TimescaleTsDatabaseSchemaService() { - super("schema-timescale.sql", "schema-timescale-idx.sql"); + super("schema-timescale.sql", null); } @Override public void createDatabaseSchema() throws Exception { super.createDatabaseSchema(); - executeQuery("SELECT create_hypertable('tenant_ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);"); + executeQuery("SELECT create_hypertable('ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);"); } private void executeQuery(String query) { diff --git a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java index a2a9611581..e438f965c8 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java @@ -43,27 +43,27 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr private static final String TENANT_TS_KV_OLD_TABLE = "tenant_ts_kv_old;"; private static final String CREATE_TS_KV_LATEST_TABLE = "create_ts_kv_latest_table()"; - private static final String CREATE_NEW_TENANT_TS_KV_TABLE = "create_new_tenant_ts_kv_table()"; + private static final String CREATE_NEW_TS_KV_TABLE = "create_new_ts_kv_table()"; private static final String CREATE_TS_KV_DICTIONARY_TABLE = "create_ts_kv_dictionary_table()"; private static final String INSERT_INTO_DICTIONARY = "insert_into_dictionary()"; - private static final String INSERT_INTO_TENANT_TS_KV = "insert_into_tenant_ts_kv()"; + private static final String INSERT_INTO_TS_KV = "insert_into_ts_kv()"; private static final String INSERT_INTO_TS_KV_LATEST = "insert_into_ts_kv_latest()"; private static final String CALL_CREATE_TS_KV_LATEST_TABLE = CALL_REGEX + CREATE_TS_KV_LATEST_TABLE; - private static final String CALL_CREATE_NEW_TENANT_TS_KV_TABLE = CALL_REGEX + CREATE_NEW_TENANT_TS_KV_TABLE; + private static final String CALL_CREATE_NEW_TENANT_TS_KV_TABLE = CALL_REGEX + CREATE_NEW_TS_KV_TABLE; private static final String CALL_CREATE_TS_KV_DICTIONARY_TABLE = CALL_REGEX + CREATE_TS_KV_DICTIONARY_TABLE; private static final String CALL_INSERT_INTO_DICTIONARY = CALL_REGEX + INSERT_INTO_DICTIONARY; - private static final String CALL_INSERT_INTO_TS_KV = CALL_REGEX + INSERT_INTO_TENANT_TS_KV; + private static final String CALL_INSERT_INTO_TS_KV = CALL_REGEX + INSERT_INTO_TS_KV; private static final String CALL_INSERT_INTO_TS_KV_LATEST = CALL_REGEX + INSERT_INTO_TS_KV_LATEST; private static final String DROP_OLD_TENANT_TS_KV_TABLE = DROP_TABLE + TENANT_TS_KV_OLD_TABLE; - private static final String DROP_FUNCTION_CREATE_TS_KV_LATEST_TABLE = DROP_FUNCTION_IF_EXISTS + CREATE_TS_KV_LATEST_TABLE; - private static final String DROP_FUNCTION_CREATE_TENANT_TS_KV_TABLE_COPY = DROP_FUNCTION_IF_EXISTS + CREATE_NEW_TENANT_TS_KV_TABLE; - private static final String DROP_FUNCTION_CREATE_TS_KV_DICTIONARY_TABLE = DROP_FUNCTION_IF_EXISTS + CREATE_TS_KV_DICTIONARY_TABLE; - private static final String DROP_FUNCTION_INSERT_INTO_DICTIONARY = DROP_FUNCTION_IF_EXISTS + INSERT_INTO_DICTIONARY; - private static final String DROP_FUNCTION_INSERT_INTO_TENANT_TS_KV = DROP_FUNCTION_IF_EXISTS + INSERT_INTO_TENANT_TS_KV; - private static final String DROP_FUNCTION_INSERT_INTO_TS_KV_LATEST = DROP_FUNCTION_IF_EXISTS + INSERT_INTO_TS_KV_LATEST; + private static final String DROP_PROCEDURE_CREATE_TS_KV_LATEST_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_TS_KV_LATEST_TABLE; + private static final String DROP_PROCEDURE_CREATE_TENANT_TS_KV_TABLE_COPY = DROP_PROCEDURE_IF_EXISTS + CREATE_NEW_TS_KV_TABLE; + private static final String DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_TS_KV_DICTIONARY_TABLE; + private static final String DROP_PROCEDURE_INSERT_INTO_DICTIONARY = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_DICTIONARY; + private static final String DROP_PROCEDURE_INSERT_INTO_TENANT_TS_KV = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV; + private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST; @Autowired private InstallScripts installScripts; @@ -78,33 +78,31 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr loadSql(conn); boolean versionValid = checkVersion(conn); if (!versionValid) { - log.info("PostgreSQL version should be at least more than 9.6!"); + log.info("PostgreSQL version should be at least more than 11!"); log.info("Please upgrade your PostgreSQL and restart the script!"); } else { log.info("PostgreSQL version is valid!"); log.info("Updating schema ..."); - executeFunction(conn, CALL_CREATE_TS_KV_LATEST_TABLE); - executeFunction(conn, CALL_CREATE_NEW_TENANT_TS_KV_TABLE); + executeQuery(conn, CALL_CREATE_TS_KV_LATEST_TABLE); + executeQuery(conn, CALL_CREATE_NEW_TENANT_TS_KV_TABLE); - executeQuery(conn, "SELECT create_hypertable('tenant_ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);"); + executeQuery(conn, "SELECT create_hypertable('ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);"); - executeFunction(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE); - executeFunction(conn, CALL_INSERT_INTO_DICTIONARY); - executeFunction(conn, CALL_INSERT_INTO_TS_KV); - executeFunction(conn, CALL_INSERT_INTO_TS_KV_LATEST); + executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE); + executeQuery(conn, CALL_INSERT_INTO_DICTIONARY); + executeQuery(conn, CALL_INSERT_INTO_TS_KV); + executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST); - //executeQuery(conn, "SELECT set_chunk_time_interval('tenant_ts_kv', " + chunkTimeInterval +");"); + executeQuery(conn, DROP_OLD_TENANT_TS_KV_TABLE); - executeDropStatement(conn, DROP_OLD_TENANT_TS_KV_TABLE); + executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_LATEST_TABLE); + executeQuery(conn, DROP_PROCEDURE_CREATE_TENANT_TS_KV_TABLE_COPY); + executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TENANT_TS_KV); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); - executeDropStatement(conn, DROP_FUNCTION_CREATE_TS_KV_LATEST_TABLE); - executeDropStatement(conn, DROP_FUNCTION_CREATE_TENANT_TS_KV_TABLE_COPY); - executeDropStatement(conn, DROP_FUNCTION_CREATE_TS_KV_DICTIONARY_TABLE); - executeDropStatement(conn, DROP_FUNCTION_INSERT_INTO_DICTIONARY); - executeDropStatement(conn, DROP_FUNCTION_INSERT_INTO_TENANT_TS_KV); - executeDropStatement(conn, DROP_FUNCTION_INSERT_INTO_TS_KV_LATEST); - - executeQuery(conn, "ALTER TABLE tenant_ts_kv ADD COLUMN json_v json;"); + executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN json_v json;"); executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN json_v json;"); log.info("schema timeseries updated!"); diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbstractTsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbstractTsKvEntity.java index f0dd03b5ca..36d9421c32 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbstractTsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbstractTsKvEntity.java @@ -36,6 +36,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.BOOLEAN_VALUE_COLU import static org.thingsboard.server.dao.model.ModelConstants.DOUBLE_VALUE_COLUMN; import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN; import static org.thingsboard.server.dao.model.ModelConstants.JSON_VALUE_COLUMN; +import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN; import static org.thingsboard.server.dao.model.ModelConstants.LONG_VALUE_COLUMN; import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUMN; import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN; @@ -53,6 +54,10 @@ public abstract class AbstractTsKvEntity implements ToData { @Column(name = ENTITY_ID_COLUMN, columnDefinition = "uuid") protected UUID entityId; + @Id + @Column(name = KEY_COLUMN) + protected int key; + @Id @Column(name = TS_COLUMN) protected Long ts; diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java index 01fe8322e3..e7de4afa67 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java @@ -69,10 +69,6 @@ import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN; }) public final class TsKvLatestEntity extends AbstractTsKvEntity { - @Id - @Column(name = KEY_COLUMN) - private int key; - @Override public boolean isNotEmpty() { return strValue != null || longValue != null || doubleValue != null || booleanValue != null || jsonValue != null; diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvCompositeKey.java index e7db0572ec..afcf9b1d51 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvCompositeKey.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvCompositeKey.java @@ -31,7 +31,6 @@ public class TimescaleTsKvCompositeKey implements Serializable { @Transient private static final long serialVersionUID = -4089175869616037523L; - private UUID tenantId; private UUID entityId; private int key; private long ts; diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvEntity.java index 76a95667a9..832a85d3e0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvEntity.java @@ -18,25 +18,18 @@ package org.thingsboard.server.dao.model.sqlts.timescale.ts; import lombok.Data; import lombok.EqualsAndHashCode; import org.springframework.util.StringUtils; -import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.dao.model.ToData; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; -import javax.persistence.Column; import javax.persistence.ColumnResult; import javax.persistence.ConstructorResult; import javax.persistence.Entity; -import javax.persistence.Id; import javax.persistence.IdClass; import javax.persistence.NamedNativeQueries; import javax.persistence.NamedNativeQuery; import javax.persistence.SqlResultSetMapping; import javax.persistence.SqlResultSetMappings; import javax.persistence.Table; -import java.util.UUID; -import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN; -import static org.thingsboard.server.dao.model.ModelConstants.TENANT_ID_COLUMN; import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_AVG; import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_AVG_QUERY; import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_COUNT; @@ -52,7 +45,7 @@ import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.F @Data @EqualsAndHashCode(callSuper = true) @Entity -@Table(name = "tenant_ts_kv") +@Table(name = "ts_kv") @IdClass(TimescaleTsKvCompositeKey.class) @SqlResultSetMappings({ @SqlResultSetMapping( @@ -116,15 +109,7 @@ import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.F resultSetMapping = "timescaleCountMapping" ) }) -public final class TimescaleTsKvEntity extends AbstractTsKvEntity implements ToData { - - @Id - @Column(name = TENANT_ID_COLUMN, columnDefinition = "uuid") - private UUID tenantId; - - @Id - @Column(name = KEY_COLUMN) - private int key; +public final class TimescaleTsKvEntity extends AbstractTsKvEntity { public TimescaleTsKvEntity() { } diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java index 6d01b62d25..3a14d0c957 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java @@ -32,10 +32,6 @@ import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN; @IdClass(TsKvCompositeKey.class) public final class TsKvEntity extends AbstractTsKvEntity { - @Id - @Column(name = KEY_COLUMN) - private int key; - public TsKvEntity() { } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index 588f2ef0e4..c4ac4e9fd8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -96,7 +96,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq @Override public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return getRemoveLatestFuture(tenantId, entityId, query); + return getRemoveLatestFuture(entityId, query); } @Override @@ -125,9 +125,9 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq } @Override - protected ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { + protected ListenableFuture> findAllAsync(EntityId entityId, ReadTsKvQuery query) { if (query.getAggregation() == Aggregation.NONE) { - return findAllAsyncWithLimit(tenantId, entityId, query); + return findAllAsyncWithLimit(entityId, query); } else { long stepTs = query.getStartTs(); List>> futures = new ArrayList<>(); @@ -135,7 +135,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq long startTs = stepTs; long endTs = stepTs + query.getInterval(); long ts = startTs + (endTs - startTs) / 2; - futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation())); + futures.add(findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation())); stepTs = endTs; } return getTskvEntriesFuture(Futures.allAsList(futures)); @@ -143,7 +143,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq } @Override - protected ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { + protected ListenableFuture> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { Integer keyId = getOrSaveKeyId(query.getKey()); List tsKvEntities = tsKvRepository.findAllWithLimit( entityId.getId(), @@ -157,9 +157,9 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities)); } - protected ListenableFuture> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) { + private ListenableFuture> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) { List> entitiesFutures = new ArrayList<>(); - switchAggregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures); + switchAggregation(entityId, key, startTs, endTs, aggregation, entitiesFutures); return Futures.transform(setFutures(entitiesFutures), entity -> { if (entity != null && entity.isNotEmpty()) { entity.setEntityId(entityId.getId()); @@ -172,29 +172,29 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq }, MoreExecutors.directExecutor()); } - protected void switchAggregation(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List> entitiesFutures) { + protected void switchAggregation(EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List> entitiesFutures) { switch (aggregation) { case AVG: - findAvg(tenantId, entityId, key, startTs, endTs, entitiesFutures); + findAvg(entityId, key, startTs, endTs, entitiesFutures); break; case MAX: - findMax(tenantId, entityId, key, startTs, endTs, entitiesFutures); + findMax(entityId, key, startTs, endTs, entitiesFutures); break; case MIN: - findMin(tenantId, entityId, key, startTs, endTs, entitiesFutures); + findMin(entityId, key, startTs, endTs, entitiesFutures); break; case SUM: - findSum(tenantId, entityId, key, startTs, endTs, entitiesFutures); + findSum(entityId, key, startTs, endTs, entitiesFutures); break; case COUNT: - findCount(tenantId, entityId, key, startTs, endTs, entitiesFutures); + findCount(entityId, key, startTs, endTs, entitiesFutures); break; default: throw new IllegalArgumentException("Not supported aggregation type: " + aggregation); } } - protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { + protected void findCount(EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { Integer keyId = getOrSaveKeyId(key); entitiesFutures.add(tsKvRepository.findCount( entityId.getId(), @@ -203,7 +203,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq endTs)); } - protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { + protected void findSum(EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { Integer keyId = getOrSaveKeyId(key); entitiesFutures.add(tsKvRepository.findSum( entityId.getId(), @@ -212,7 +212,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq endTs)); } - protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { + protected void findMin(EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { Integer keyId = getOrSaveKeyId(key); entitiesFutures.add(tsKvRepository.findStringMin( entityId.getId(), @@ -226,7 +226,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq endTs)); } - protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { + protected void findMax(EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { Integer keyId = getOrSaveKeyId(key); entitiesFutures.add(tsKvRepository.findStringMax( entityId.getId(), @@ -240,7 +240,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq endTs)); } - protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { + protected void findAvg(EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { Integer keyId = getOrSaveKeyId(key); entitiesFutures.add(tsKvRepository.findAvg( entityId.getId(), diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java index a9277ec7e2..1d97aaddd8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java @@ -127,7 +127,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx protected ListenableFuture> processFindAllAsync(TenantId tenantId, EntityId entityId, List queries) { List>> futures = queries .stream() - .map(query -> findAllAsync(tenantId, entityId, query)) + .map(query -> findAllAsync(entityId, query)) .collect(Collectors.toList()); return Futures.transform(Futures.allAsList(futures), new Function>, List>() { @Nullable @@ -144,9 +144,9 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx }, service); } - protected abstract ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query); + protected abstract ListenableFuture> findAllAsync(EntityId entityId, ReadTsKvQuery query); - protected abstract ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query); + protected abstract ListenableFuture> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query); protected ListenableFuture> getTskvEntriesFuture(ListenableFuture>> future) { return Futures.transform(future, new Function>, List>() { @@ -164,12 +164,12 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx }, service); } - protected ListenableFuture> findNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + protected ListenableFuture> findNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) { long startTs = 0; long endTs = query.getStartTs() - 1; ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1, Aggregation.NONE, DESC_ORDER); - return findAllAsync(tenantId, entityId, findNewLatestQuery); + return findAllAsync(entityId, findNewLatestQuery); } protected ListenableFuture getFindLatestFuture(EntityId entityId, String key) { @@ -189,7 +189,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx return Futures.immediateFuture(result); } - protected ListenableFuture getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + protected ListenableFuture getRemoveLatestFuture(EntityId entityId, DeleteTsKvQuery query) { ListenableFuture latestFuture = getFindLatestFuture(entityId, query.getKey()); ListenableFuture booleanFuture = Futures.transform(latestFuture, tsKvEntry -> { @@ -217,7 +217,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx if (query.getRewriteLatestIfDeleted()) { ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { if (isRemove) { - return getNewLatestEntryFuture(tenantId, entityId, query); + return getNewLatestEntryFuture(entityId, query); } return Futures.immediateFuture(null); }, service); @@ -296,8 +296,8 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx return keyId; } - private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - ListenableFuture> future = findNewLatestEntryFuture(tenantId, entityId, query); + private ListenableFuture getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) { + ListenableFuture> future = findNewLatestEntryFuture(entityId, query); return Futures.transformAsync(future, entryList -> { if (entryList.size() == 1) { return getSaveLatestFuture(entityId, entryList.get(0)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java index 738ae52a9d..1fa1fc4219 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java @@ -37,8 +37,8 @@ import java.util.List; public class TimescaleInsertTsRepository extends AbstractInsertRepository implements InsertTsRepository { private static final String INSERT_OR_UPDATE = - "INSERT INTO tenant_ts_kv (tenant_id, entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " + - "ON CONFLICT (tenant_id, entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);"; + "INSERT INTO ts_kv (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " + + "ON CONFLICT (entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);"; @Override public void saveOrUpdate(List> entities) { @@ -46,41 +46,40 @@ public class TimescaleInsertTsRepository extends AbstractInsertRepository implem @Override public void setValues(PreparedStatement ps, int i) throws SQLException { TimescaleTsKvEntity tsKvEntity = entities.get(i).getEntity(); - ps.setObject(1, tsKvEntity.getTenantId()); - ps.setObject(2, tsKvEntity.getEntityId()); - ps.setInt(3, tsKvEntity.getKey()); - ps.setLong(4, tsKvEntity.getTs()); + ps.setObject(1, tsKvEntity.getEntityId()); + ps.setInt(2, tsKvEntity.getKey()); + ps.setLong(3, tsKvEntity.getTs()); if (tsKvEntity.getBooleanValue() != null) { - ps.setBoolean(5, tsKvEntity.getBooleanValue()); - ps.setBoolean(10, tsKvEntity.getBooleanValue()); + ps.setBoolean(4, tsKvEntity.getBooleanValue()); + ps.setBoolean(9, tsKvEntity.getBooleanValue()); } else { - ps.setNull(5, Types.BOOLEAN); - ps.setNull(10, Types.BOOLEAN); + ps.setNull(4, Types.BOOLEAN); + ps.setNull(9, Types.BOOLEAN); } - ps.setString(6, replaceNullChars(tsKvEntity.getStrValue())); - ps.setString(11, replaceNullChars(tsKvEntity.getStrValue())); + ps.setString(5, replaceNullChars(tsKvEntity.getStrValue())); + ps.setString(10, replaceNullChars(tsKvEntity.getStrValue())); if (tsKvEntity.getLongValue() != null) { - ps.setLong(7, tsKvEntity.getLongValue()); - ps.setLong(12, tsKvEntity.getLongValue()); + ps.setLong(6, tsKvEntity.getLongValue()); + ps.setLong(11, tsKvEntity.getLongValue()); } else { - ps.setNull(7, Types.BIGINT); - ps.setNull(12, Types.BIGINT); + ps.setNull(6, Types.BIGINT); + ps.setNull(11, Types.BIGINT); } if (tsKvEntity.getDoubleValue() != null) { - ps.setDouble(8, tsKvEntity.getDoubleValue()); - ps.setDouble(13, tsKvEntity.getDoubleValue()); + ps.setDouble(7, tsKvEntity.getDoubleValue()); + ps.setDouble(12, tsKvEntity.getDoubleValue()); } else { - ps.setNull(8, Types.DOUBLE); - ps.setNull(13, Types.DOUBLE); + ps.setNull(7, Types.DOUBLE); + ps.setNull(12, Types.DOUBLE); } - ps.setString(9, replaceNullChars(tsKvEntity.getJsonValue())); - ps.setString(14, replaceNullChars(tsKvEntity.getJsonValue())); + ps.setString(8, replaceNullChars(tsKvEntity.getJsonValue())); + ps.setString(13, replaceNullChars(tsKvEntity.getJsonValue())); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/AggregationRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/AggregationRepository.java index ed784b96ba..28b666b03b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/AggregationRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/AggregationRepository.java @@ -36,7 +36,7 @@ public class AggregationRepository { public static final String FIND_SUM = "findSum"; public static final String FIND_COUNT = "findCount"; - public static final String FROM_WHERE_CLAUSE = "FROM tenant_ts_kv tskv WHERE tskv.tenant_id = cast(:tenantId AS uuid) AND tskv.entity_id = cast(:entityId AS uuid) AND tskv.key= cast(:entityKey AS int) AND tskv.ts > :startTs AND tskv.ts <= :endTs GROUP BY tskv.tenant_id, tskv.entity_id, tskv.key, tsBucket ORDER BY tskv.tenant_id, tskv.entity_id, tskv.key, tsBucket"; + public static final String FROM_WHERE_CLAUSE = "FROM ts_kv tskv WHERE tskv.entity_id = cast(:entityId AS uuid) AND tskv.key= cast(:entityKey AS int) AND tskv.ts > :startTs AND tskv.ts <= :endTs GROUP BY tskv.entity_id, tskv.key, tsBucket ORDER BY tskv.entity_id, tskv.key, tsBucket"; public static final String FIND_AVG_QUERY = "SELECT time_bucket(:timeBucket, tskv.ts) AS tsBucket, :timeBucket AS interval, SUM(COALESCE(tskv.long_v, 0)) AS longValue, SUM(COALESCE(tskv.dbl_v, 0.0)) AS doubleValue, SUM(CASE WHEN tskv.long_v IS NULL THEN 0 ELSE 1 END) AS longCountValue, SUM(CASE WHEN tskv.dbl_v IS NULL THEN 0 ELSE 1 END) AS doubleCountValue, null AS strValue, 'AVG' AS aggType "; @@ -52,43 +52,42 @@ public class AggregationRepository { private EntityManager entityManager; @Async - public CompletableFuture> findAvg(UUID tenantId, UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) { + public CompletableFuture> findAvg(UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) { @SuppressWarnings("unchecked") - List resultList = getResultList(tenantId, entityId, entityKey, timeBucket, startTs, endTs, FIND_AVG); + List resultList = getResultList(entityId, entityKey, timeBucket, startTs, endTs, FIND_AVG); return CompletableFuture.supplyAsync(() -> resultList); } @Async - public CompletableFuture> findMax(UUID tenantId, UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) { + public CompletableFuture> findMax(UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) { @SuppressWarnings("unchecked") - List resultList = getResultList(tenantId, entityId, entityKey, timeBucket, startTs, endTs, FIND_MAX); + List resultList = getResultList(entityId, entityKey, timeBucket, startTs, endTs, FIND_MAX); return CompletableFuture.supplyAsync(() -> resultList); } @Async - public CompletableFuture> findMin(UUID tenantId, UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) { + public CompletableFuture> findMin(UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) { @SuppressWarnings("unchecked") - List resultList = getResultList(tenantId, entityId, entityKey, timeBucket, startTs, endTs, FIND_MIN); + List resultList = getResultList(entityId, entityKey, timeBucket, startTs, endTs, FIND_MIN); return CompletableFuture.supplyAsync(() -> resultList); } @Async - public CompletableFuture> findSum(UUID tenantId, UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) { + public CompletableFuture> findSum(UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) { @SuppressWarnings("unchecked") - List resultList = getResultList(tenantId, entityId, entityKey, timeBucket, startTs, endTs, FIND_SUM); + List resultList = getResultList(entityId, entityKey, timeBucket, startTs, endTs, FIND_SUM); return CompletableFuture.supplyAsync(() -> resultList); } @Async - public CompletableFuture> findCount(UUID tenantId, UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) { + public CompletableFuture> findCount(UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) { @SuppressWarnings("unchecked") - List resultList = getResultList(tenantId, entityId, entityKey, timeBucket, startTs, endTs, FIND_COUNT); + List resultList = getResultList(entityId, entityKey, timeBucket, startTs, endTs, FIND_COUNT); return CompletableFuture.supplyAsync(() -> resultList); } - private List getResultList(UUID tenantId, UUID entityId, int entityKey, long timeBucket, long startTs, long endTs, String query) { + private List getResultList(UUID entityId, int entityKey, long timeBucket, long startTs, long endTs, String query) { return entityManager.createNamedQuery(query) - .setParameter("tenantId", tenantId) .setParameter("entityId", entityId) .setParameter("entityKey", entityKey) .setParameter("timeBucket", timeBucket) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 4ca53a337b..bf4cf5d9e6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -88,24 +88,23 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements } @Override - protected ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { + protected ListenableFuture> findAllAsync(EntityId entityId, ReadTsKvQuery query) { if (query.getAggregation() == Aggregation.NONE) { - return findAllAsyncWithLimit(tenantId, entityId, query); + return findAllAsyncWithLimit(entityId, query); } else { long startTs = query.getStartTs(); long endTs = query.getEndTs(); long timeBucket = query.getInterval(); - ListenableFuture>> future = findAllAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, timeBucket, query.getAggregation()); + ListenableFuture>> future = findAllAndAggregateAsync(entityId, query.getKey(), startTs, endTs, timeBucket, query.getAggregation()); return getTskvEntriesFuture(future); } } @Override - protected ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { + protected ListenableFuture> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { String strKey = query.getKey(); Integer keyId = getOrSaveKeyId(strKey); List timescaleTsKvEntities = tsKvRepository.findAllWithLimit( - tenantId.getId(), entityId.getId(), keyId, query.getStartTs(), @@ -117,8 +116,8 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements return Futures.immediateFuture(DaoUtil.convertDataList(timescaleTsKvEntities)); } - private ListenableFuture>> findAllAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long timeBucket, Aggregation aggregation) { - CompletableFuture> listCompletableFuture = switchAggregation(key, startTs, endTs, timeBucket, aggregation, entityId.getId(), tenantId.getId()); + private ListenableFuture>> findAllAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long timeBucket, Aggregation aggregation) { + CompletableFuture> listCompletableFuture = switchAggregation(key, startTs, endTs, timeBucket, aggregation, entityId.getId()); SettableFuture> listenableFuture = SettableFuture.create(); listCompletableFuture.whenComplete((timescaleTsKvEntities, throwable) -> { if (throwable != null) { @@ -133,7 +132,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements timescaleTsKvEntities.forEach(entity -> { if (entity != null && entity.isNotEmpty()) { entity.setEntityId(entityId.getId()); - entity.setTenantId(tenantId.getId()); entity.setStrKey(key); result.add(Optional.of(DaoUtil.getData(entity))); } else { @@ -167,7 +165,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements String strKey = tsKvEntry.getKey(); Integer keyId = getOrSaveKeyId(strKey); TimescaleTsKvEntity entity = new TimescaleTsKvEntity(); - entity.setTenantId(tenantId.getId()); entity.setEntityId(entityId.getId()); entity.setTs(tsKvEntry.getTs()); entity.setKey(keyId); @@ -197,7 +194,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements Integer keyId = getOrSaveKeyId(strKey); return service.submit(() -> { tsKvRepository.delete( - tenantId.getId(), entityId.getId(), keyId, query.getStartTs(), @@ -208,7 +204,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements @Override public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return getRemoveLatestFuture(tenantId, entityId, query); + return getRemoveLatestFuture(entityId, query); } @Override @@ -216,27 +212,26 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements return service.submit(() -> null); } - private CompletableFuture> switchAggregation(String key, long startTs, long endTs, long timeBucket, Aggregation aggregation, UUID entityId, UUID tenantId) { + private CompletableFuture> switchAggregation(String key, long startTs, long endTs, long timeBucket, Aggregation aggregation, UUID entityId) { switch (aggregation) { case AVG: - return findAvg(key, startTs, endTs, timeBucket, entityId, tenantId); + return findAvg(key, startTs, endTs, timeBucket, entityId); case MAX: - return findMax(key, startTs, endTs, timeBucket, entityId, tenantId); + return findMax(key, startTs, endTs, timeBucket, entityId); case MIN: - return findMin(key, startTs, endTs, timeBucket, entityId, tenantId); + return findMin(key, startTs, endTs, timeBucket, entityId); case SUM: - return findSum(key, startTs, endTs, timeBucket, entityId, tenantId); + return findSum(key, startTs, endTs, timeBucket, entityId); case COUNT: - return findCount(key, startTs, endTs, timeBucket, entityId, tenantId); + return findCount(key, startTs, endTs, timeBucket, entityId); default: throw new IllegalArgumentException("Not supported aggregation type: " + aggregation); } } - private CompletableFuture> findCount(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) { + private CompletableFuture> findCount(String key, long startTs, long endTs, long timeBucket, UUID entityId) { Integer keyId = getOrSaveKeyId(key); return aggregationRepository.findCount( - tenantId, entityId, keyId, timeBucket, @@ -244,10 +239,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements endTs); } - private CompletableFuture> findSum(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) { + private CompletableFuture> findSum(String key, long startTs, long endTs, long timeBucket, UUID entityId) { Integer keyId = getOrSaveKeyId(key); return aggregationRepository.findSum( - tenantId, entityId, keyId, timeBucket, @@ -255,10 +249,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements endTs); } - private CompletableFuture> findMin(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) { + private CompletableFuture> findMin(String key, long startTs, long endTs, long timeBucket, UUID entityId) { Integer keyId = getOrSaveKeyId(key); return aggregationRepository.findMin( - tenantId, entityId, keyId, timeBucket, @@ -266,10 +259,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements endTs); } - private CompletableFuture> findMax(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) { + private CompletableFuture> findMax(String key, long startTs, long endTs, long timeBucket, UUID entityId) { Integer keyId = getOrSaveKeyId(key); return aggregationRepository.findMax( - tenantId, entityId, keyId, timeBucket, @@ -277,10 +269,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements endTs); } - private CompletableFuture> findAvg(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) { + private CompletableFuture> findAvg(String key, long startTs, long endTs, long timeBucket, UUID entityId) { Integer keyId = getOrSaveKeyId(key); return aggregationRepository.findAvg( - tenantId, entityId, keyId, timeBucket, diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TsKvTimescaleRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TsKvTimescaleRepository.java index fb9cb6f7fe..d4e80dc1f5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TsKvTimescaleRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TsKvTimescaleRepository.java @@ -31,12 +31,10 @@ import java.util.UUID; @TimescaleDBTsDao public interface TsKvTimescaleRepository extends CrudRepository { - @Query("SELECT tskv FROM TimescaleTsKvEntity tskv WHERE tskv.tenantId = :tenantId " + - "AND tskv.entityId = :entityId " + + @Query("SELECT tskv FROM TimescaleTsKvEntity tskv WHERE tskv.entityId = :entityId " + "AND tskv.key = :entityKey " + "AND tskv.ts > :startTs AND tskv.ts <= :endTs") List findAllWithLimit( - @Param("tenantId") UUID tenantId, @Param("entityId") UUID entityId, @Param("entityKey") int key, @Param("startTs") long startTs, @@ -44,12 +42,10 @@ public interface TsKvTimescaleRepository extends CrudRepository :startTs AND tskv.ts <= :endTs") - void delete(@Param("tenantId") UUID tenantId, - @Param("entityId") UUID entityId, + void delete(@Param("entityId") UUID entityId, @Param("entityKey") int key, @Param("startTs") long startTs, @Param("endTs") long endTs); diff --git a/dao/src/main/resources/sql/schema-timescale-idx.sql b/dao/src/main/resources/sql/schema-timescale-idx.sql deleted file mode 100644 index b9a3737d47..0000000000 --- a/dao/src/main/resources/sql/schema-timescale-idx.sql +++ /dev/null @@ -1,17 +0,0 @@ --- --- Copyright © 2016-2020 The Thingsboard Authors --- --- Licensed under the Apache License, Version 2.0 (the "License"); --- you may not use this file except in compliance with the License. --- You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. --- - -CREATE INDEX IF NOT EXISTS idx_tenant_ts_kv ON tenant_ts_kv(tenant_id, entity_id, key, ts); \ No newline at end of file diff --git a/dao/src/main/resources/sql/schema-timescale.sql b/dao/src/main/resources/sql/schema-timescale.sql index 7251d8be4e..b95c8b86ba 100644 --- a/dao/src/main/resources/sql/schema-timescale.sql +++ b/dao/src/main/resources/sql/schema-timescale.sql @@ -16,8 +16,7 @@ CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE; -CREATE TABLE IF NOT EXISTS tenant_ts_kv ( - tenant_id uuid NOT NULL, +CREATE TABLE IF NOT EXISTS ts_kv ( entity_id uuid NOT NULL, key int NOT NULL, ts bigint NOT NULL, @@ -26,7 +25,7 @@ CREATE TABLE IF NOT EXISTS tenant_ts_kv ( long_v bigint, dbl_v double precision, json_v json, - CONSTRAINT tenant_ts_kv_pkey PRIMARY KEY (tenant_id, entity_id, key, ts) + CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts) ); CREATE TABLE IF NOT EXISTS ts_kv_dictionary ( diff --git a/dao/src/test/java/org/thingsboard/server/dao/SqlDaoServiceTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/SqlDaoServiceTestSuite.java index 7ebab237a8..6d306c4e71 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/SqlDaoServiceTestSuite.java +++ b/dao/src/test/java/org/thingsboard/server/dao/SqlDaoServiceTestSuite.java @@ -44,7 +44,7 @@ public class SqlDaoServiceTestSuite { // @ClassRule // public static CustomSqlUnit sqlUnit = new CustomSqlUnit( -// Arrays.asList("sql/schema-timescale.sql", "sql/schema-timescale-idx.sql", "sql/schema-entities.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql", "sql/system-test.sql"), +// Arrays.asList("sql/schema-timescale.sql", "sql/schema-entities.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql", "sql/system-test.sql"), // "sql/timescale/drop-all-tables.sql", // "sql-test.properties" // ); diff --git a/dao/src/test/resources/sql/timescale/drop-all-tables.sql b/dao/src/test/resources/sql/timescale/drop-all-tables.sql index 08d018dc1b..ac921c0f4a 100644 --- a/dao/src/test/resources/sql/timescale/drop-all-tables.sql +++ b/dao/src/test/resources/sql/timescale/drop-all-tables.sql @@ -12,7 +12,7 @@ DROP TABLE IF EXISTS event; DROP TABLE IF EXISTS relation; DROP TABLE IF EXISTS tb_user; DROP TABLE IF EXISTS tenant; -DROP TABLE IF EXISTS tenant_ts_kv; +DROP TABLE IF EXISTS ts_kv; DROP TABLE IF EXISTS ts_kv_latest; DROP TABLE IF EXISTS user_credentials; DROP TABLE IF EXISTS widget_type;