Upgrade Sql Ts & Timescale improvements (#2495)

* psql & timescale ts upgrade improved

* fix typo

* fix typo 2

* removed tenant_id from timescale db schema & upgade scipt logic
This commit is contained in:
ShvaykaD 2020-03-10 17:49:00 +02:00 committed by GitHub
parent aabc22d7d2
commit 188c3e5b63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 259 additions and 376 deletions

View File

@ -14,33 +14,27 @@
-- limitations under the License.
--
-- select check_version();
-- call check_version();
CREATE OR REPLACE FUNCTION check_version() RETURNS boolean AS $$
CREATE OR REPLACE PROCEDURE check_version(INOUT valid_version boolean) LANGUAGE plpgsql AS $BODY$
DECLARE
current_version integer;
valid_version boolean;
BEGIN
RAISE NOTICE 'Check the current installed PostgreSQL version...';
SELECT current_setting('server_version_num') INTO current_version;
IF current_version < 100000 THEN
valid_version := FALSE;
ELSE
valid_version := TRUE;
END IF;
IF valid_version = FALSE THEN
RAISE NOTICE 'Postgres version should be at least more than 10!';
ELSE
IF current_version > 110000 THEN
RAISE NOTICE 'PostgreSQL version is valid!';
RAISE NOTICE 'Schema update started...';
SELECT true INTO valid_version;
ELSE
RAISE NOTICE 'Postgres version should be at least more than 10!';
END IF;
RETURN valid_version;
END;
$$ LANGUAGE 'plpgsql';
$BODY$;
-- select create_partition_ts_kv_table();
-- call create_partition_ts_kv_table();
CREATE OR REPLACE FUNCTION create_partition_ts_kv_table() RETURNS VOID AS $$
CREATE OR REPLACE PROCEDURE create_partition_ts_kv_table() LANGUAGE plpgsql AS $$
BEGIN
ALTER TABLE ts_kv
@ -57,11 +51,11 @@ BEGIN
ALTER TABLE ts_kv
ALTER COLUMN key TYPE integer USING key::integer;
END;
$$ LANGUAGE 'plpgsql';
$$;
-- select create_new_ts_kv_latest_table();
-- call create_new_ts_kv_latest_table();
CREATE OR REPLACE FUNCTION create_new_ts_kv_latest_table() RETURNS VOID AS $$
CREATE OR REPLACE PROCEDURE create_new_ts_kv_latest_table() LANGUAGE plpgsql AS $$
BEGIN
ALTER TABLE ts_kv_latest
@ -81,13 +75,13 @@ BEGIN
ALTER TABLE ts_kv_latest
ADD CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key);
END;
$$ LANGUAGE 'plpgsql';
$$;
-- select create_partitions();
-- call create_partitions();
CREATE OR REPLACE PROCEDURE create_partitions() LANGUAGE plpgsql AS $$
CREATE OR REPLACE FUNCTION create_partitions() RETURNS VOID AS
$$
DECLARE
partition_date varchar;
from_ts bigint;
@ -111,11 +105,11 @@ BEGIN
CLOSE key_cursor;
END;
$$ language 'plpgsql';
$$;
-- select create_ts_kv_dictionary_table();
-- call create_ts_kv_dictionary_table();
CREATE OR REPLACE FUNCTION create_ts_kv_dictionary_table() RETURNS VOID AS $$
CREATE OR REPLACE PROCEDURE create_ts_kv_dictionary_table() LANGUAGE plpgsql AS $$
BEGIN
CREATE TABLE IF NOT EXISTS ts_kv_dictionary
@ -125,12 +119,12 @@ BEGIN
CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
);
END;
$$ LANGUAGE 'plpgsql';
$$;
-- select insert_into_dictionary();
-- call insert_into_dictionary();
CREATE OR REPLACE PROCEDURE insert_into_dictionary() LANGUAGE plpgsql AS $$
CREATE OR REPLACE FUNCTION insert_into_dictionary() RETURNS VOID AS
$$
DECLARE
insert_record RECORD;
key_cursor CURSOR FOR SELECT DISTINCT key
@ -150,28 +144,27 @@ BEGIN
END LOOP;
CLOSE key_cursor;
END;
$$ language 'plpgsql';
$$;
-- select insert_into_ts_kv();
-- call insert_into_ts_kv();
CREATE OR REPLACE FUNCTION insert_into_ts_kv() RETURNS void AS
$$
CREATE OR REPLACE PROCEDURE insert_into_ts_kv() LANGUAGE plpgsql AS $$
DECLARE
insert_size CONSTANT integer := 10000;
insert_counter integer DEFAULT 0;
insert_record RECORD;
insert_cursor CURSOR FOR SELECT CONCAT(first_part_uuid, '-', second_part_uuid, '-1', third_part_uuid, '-', fourth_part_uuid, '-', fifth_part_uuid)::uuid AS entity_id,
insert_cursor CURSOR FOR SELECT CONCAT(entity_id_uuid_first_part, '-', entity_id_uuid_second_part, '-1', entity_id_uuid_third_part, '-', entity_id_uuid_fourth_part, '-', entity_id_uuid_fifth_part)::uuid AS entity_id,
ts_kv_records.key AS key,
ts_kv_records.ts AS ts,
ts_kv_records.bool_v AS bool_v,
ts_kv_records.str_v AS str_v,
ts_kv_records.long_v AS long_v,
ts_kv_records.dbl_v AS dbl_v
FROM (SELECT SUBSTRING(entity_id, 8, 8) AS first_part_uuid,
SUBSTRING(entity_id, 4, 4) AS second_part_uuid,
SUBSTRING(entity_id, 1, 3) AS third_part_uuid,
SUBSTRING(entity_id, 16, 4) AS fourth_part_uuid,
SUBSTRING(entity_id, 20) AS fifth_part_uuid,
FROM (SELECT SUBSTRING(entity_id, 8, 8) AS entity_id_uuid_first_part,
SUBSTRING(entity_id, 4, 4) AS entity_id_uuid_second_part,
SUBSTRING(entity_id, 1, 3) AS entity_id_uuid_third_part,
SUBSTRING(entity_id, 16, 4) AS entity_id_uuid_fourth_part,
SUBSTRING(entity_id, 20) AS entity_id_uuid_fifth_part,
key_id AS key,
ts,
bool_v,
@ -198,28 +191,27 @@ BEGIN
END LOOP;
CLOSE insert_cursor;
END;
$$ LANGUAGE 'plpgsql';
$$;
-- select insert_into_ts_kv_latest();
-- call insert_into_ts_kv_latest();
CREATE OR REPLACE FUNCTION insert_into_ts_kv_latest() RETURNS void AS
$$
CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest() LANGUAGE plpgsql AS $$
DECLARE
insert_size CONSTANT integer := 10000;
insert_counter integer DEFAULT 0;
insert_record RECORD;
insert_cursor CURSOR FOR SELECT CONCAT(first_part_uuid, '-', second_part_uuid, '-1', third_part_uuid, '-', fourth_part_uuid, '-', fifth_part_uuid)::uuid AS entity_id,
insert_cursor CURSOR FOR SELECT CONCAT(entity_id_uuid_first_part, '-', entity_id_uuid_second_part, '-1', entity_id_uuid_third_part, '-', entity_id_uuid_fourth_part, '-', entity_id_uuid_fifth_part)::uuid AS entity_id,
ts_kv_latest_records.key AS key,
ts_kv_latest_records.ts AS ts,
ts_kv_latest_records.bool_v AS bool_v,
ts_kv_latest_records.str_v AS str_v,
ts_kv_latest_records.long_v AS long_v,
ts_kv_latest_records.dbl_v AS dbl_v
FROM (SELECT SUBSTRING(entity_id, 8, 8) AS first_part_uuid,
SUBSTRING(entity_id, 4, 4) AS second_part_uuid,
SUBSTRING(entity_id, 1, 3) AS third_part_uuid,
SUBSTRING(entity_id, 16, 4) AS fourth_part_uuid,
SUBSTRING(entity_id, 20) AS fifth_part_uuid,
FROM (SELECT SUBSTRING(entity_id, 8, 8) AS entity_id_uuid_first_part,
SUBSTRING(entity_id, 4, 4) AS entity_id_uuid_second_part,
SUBSTRING(entity_id, 1, 3) AS entity_id_uuid_third_part,
SUBSTRING(entity_id, 16, 4) AS entity_id_uuid_fourth_part,
SUBSTRING(entity_id, 20) AS entity_id_uuid_fifth_part,
key_id AS key,
ts,
bool_v,
@ -246,6 +238,6 @@ BEGIN
END LOOP;
CLOSE insert_cursor;
END;
$$ LANGUAGE 'plpgsql';
$$;

View File

@ -14,60 +14,51 @@
-- limitations under the License.
--
-- select check_version();
-- call check_version();
CREATE OR REPLACE PROCEDURE check_version(INOUT valid_version boolean) LANGUAGE plpgsql AS $BODY$
CREATE OR REPLACE FUNCTION check_version() RETURNS boolean AS $$
DECLARE
current_version integer;
valid_version boolean;
BEGIN
RAISE NOTICE 'Check the current installed PostgreSQL version...';
SELECT current_setting('server_version_num') INTO current_version;
IF current_version < 90600 THEN
valid_version := FALSE;
ELSE
valid_version := TRUE;
END IF;
IF valid_version = FALSE THEN
RAISE NOTICE 'Postgres version should be at least more than 9.6!';
ELSE
IF current_version > 110000 THEN
RAISE NOTICE 'PostgreSQL version is valid!';
RAISE NOTICE 'Schema update started...';
SELECT true INTO valid_version;
ELSE
RAISE NOTICE 'Postgres version should be at least more than 10!';
END IF;
RETURN valid_version;
END;
$$ LANGUAGE 'plpgsql';
$BODY$;
-- select create_new_tenant_ts_kv_table();
-- call create_new_ts_kv_table();
CREATE OR REPLACE FUNCTION create_new_tenant_ts_kv_table() RETURNS VOID AS $$
CREATE OR REPLACE PROCEDURE create_new_ts_kv_table() LANGUAGE plpgsql AS $$
BEGIN
ALTER TABLE tenant_ts_kv
RENAME TO tenant_ts_kv_old;
CREATE TABLE IF NOT EXISTS tenant_ts_kv
CREATE TABLE IF NOT EXISTS ts_kv
(
LIKE tenant_ts_kv_old
);
ALTER TABLE tenant_ts_kv
ALTER COLUMN tenant_id TYPE uuid USING tenant_id::uuid;
ALTER TABLE tenant_ts_kv
ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
ALTER TABLE tenant_ts_kv
ALTER COLUMN key TYPE integer USING key::integer;
ALTER TABLE tenant_ts_kv
ADD CONSTRAINT tenant_ts_kv_pkey PRIMARY KEY(tenant_id, entity_id, key, ts);
ALTER TABLE ts_kv ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
ALTER TABLE ts_kv ALTER COLUMN key TYPE integer USING key::integer;
ALTER INDEX ts_kv_pkey RENAME TO tenant_ts_kv_pkey_old;
ALTER INDEX idx_tenant_ts_kv RENAME TO idx_tenant_ts_kv_old;
ALTER INDEX tenant_ts_kv_ts_idx RENAME TO tenant_ts_kv_ts_idx_old;
-- PERFORM create_hypertable('tenant_ts_kv', 'ts', chunk_time_interval => 86400000, if_not_exists => true);
CREATE INDEX IF NOT EXISTS idx_tenant_ts_kv ON tenant_ts_kv(tenant_id, entity_id, key, ts);
ALTER TABLE ts_kv ADD CONSTRAINT ts_kv_pkey PRIMARY KEY(entity_id, key, ts);
-- CREATE INDEX IF NOT EXISTS ts_kv_ts_idx ON ts_kv(ts DESC);
ALTER TABLE ts_kv DROP COLUMN IF EXISTS tenant_id;
END;
$$ LANGUAGE 'plpgsql';
$$;
-- select create_ts_kv_latest_table();
-- call create_ts_kv_latest_table();
CREATE OR REPLACE FUNCTION create_ts_kv_latest_table() RETURNS VOID AS $$
CREATE OR REPLACE PROCEDURE create_ts_kv_latest_table() LANGUAGE plpgsql AS $$
BEGIN
CREATE TABLE IF NOT EXISTS ts_kv_latest
@ -82,12 +73,12 @@ BEGIN
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
);
END;
$$ LANGUAGE 'plpgsql';
$$;
-- select create_ts_kv_dictionary_table();
-- call create_ts_kv_dictionary_table();
CREATE OR REPLACE FUNCTION create_ts_kv_dictionary_table() RETURNS VOID AS $$
CREATE OR REPLACE PROCEDURE create_ts_kv_dictionary_table() LANGUAGE plpgsql AS $$
BEGIN
CREATE TABLE IF NOT EXISTS ts_kv_dictionary
@ -97,12 +88,12 @@ BEGIN
CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
);
END;
$$ LANGUAGE 'plpgsql';
$$;
-- select insert_into_dictionary();
-- call insert_into_dictionary();
CREATE OR REPLACE PROCEDURE insert_into_dictionary() LANGUAGE plpgsql AS $$
CREATE OR REPLACE FUNCTION insert_into_dictionary() RETURNS VOID AS
$$
DECLARE
insert_record RECORD;
key_cursor CURSOR FOR SELECT DISTINCT key
@ -122,34 +113,28 @@ BEGIN
END LOOP;
CLOSE key_cursor;
END;
$$ language 'plpgsql';
$$;
-- select insert_into_tenant_ts_kv();
-- call insert_into_ts_kv();
CREATE OR REPLACE PROCEDURE insert_into_ts_kv() LANGUAGE plpgsql AS $$
CREATE OR REPLACE FUNCTION insert_into_tenant_ts_kv() RETURNS void AS
$$
DECLARE
insert_size CONSTANT integer := 10000;
insert_counter integer DEFAULT 0;
insert_record RECORD;
insert_cursor CURSOR FOR SELECT CONCAT(tenant_id_first_part_uuid, '-', tenant_id_second_part_uuid, '-1', tenant_id_third_part_uuid, '-', tenant_id_fourth_part_uuid, '-', tenant_id_fifth_part_uuid)::uuid AS tenant_id,
CONCAT(entity_id_first_part_uuid, '-', entity_id_second_part_uuid, '-1', entity_id_third_part_uuid, '-', entity_id_fourth_part_uuid, '-', entity_id_fifth_part_uuid)::uuid AS entity_id,
tenant_ts_kv_records.key AS key,
tenant_ts_kv_records.ts AS ts,
tenant_ts_kv_records.bool_v AS bool_v,
tenant_ts_kv_records.str_v AS str_v,
tenant_ts_kv_records.long_v AS long_v,
tenant_ts_kv_records.dbl_v AS dbl_v
FROM (SELECT SUBSTRING(tenant_id, 8, 8) AS tenant_id_first_part_uuid,
SUBSTRING(tenant_id, 4, 4) AS tenant_id_second_part_uuid,
SUBSTRING(tenant_id, 1, 3) AS tenant_id_third_part_uuid,
SUBSTRING(tenant_id, 16, 4) AS tenant_id_fourth_part_uuid,
SUBSTRING(tenant_id, 20) AS tenant_id_fifth_part_uuid,
SUBSTRING(entity_id, 8, 8) AS entity_id_first_part_uuid,
SUBSTRING(entity_id, 4, 4) AS entity_id_second_part_uuid,
SUBSTRING(entity_id, 1, 3) AS entity_id_third_part_uuid,
SUBSTRING(entity_id, 16, 4) AS entity_id_fourth_part_uuid,
SUBSTRING(entity_id, 20) AS entity_id_fifth_part_uuid,
insert_cursor CURSOR FOR SELECT CONCAT(entity_id_uuid_first_part, '-', entity_id_uuid_second_part, '-1', entity_id_uuid_third_part, '-', entity_id_uuid_fourth_part, '-', entity_id_uuid_fifth_part)::uuid AS entity_id,
new_ts_kv_records.key AS key,
new_ts_kv_records.ts AS ts,
new_ts_kv_records.bool_v AS bool_v,
new_ts_kv_records.str_v AS str_v,
new_ts_kv_records.long_v AS long_v,
new_ts_kv_records.dbl_v AS dbl_v
FROM (SELECT SUBSTRING(entity_id, 8, 8) AS entity_id_uuid_first_part,
SUBSTRING(entity_id, 4, 4) AS entity_id_uuid_second_part,
SUBSTRING(entity_id, 1, 3) AS entity_id_uuid_third_part,
SUBSTRING(entity_id, 16, 4) AS entity_id_uuid_fourth_part,
SUBSTRING(entity_id, 20) AS entity_id_uuid_fifth_part,
key_id AS key,
ts,
bool_v,
@ -157,31 +142,31 @@ DECLARE
long_v,
dbl_v
FROM tenant_ts_kv_old
INNER JOIN ts_kv_dictionary ON (tenant_ts_kv_old.key = ts_kv_dictionary.key)) AS tenant_ts_kv_records;
INNER JOIN ts_kv_dictionary ON (tenant_ts_kv_old.key = ts_kv_dictionary.key)) AS new_ts_kv_records;
BEGIN
OPEN insert_cursor;
LOOP
insert_counter := insert_counter + 1;
FETCH insert_cursor INTO insert_record;
IF NOT FOUND THEN
RAISE NOTICE '% records have been inserted into the new tenant_ts_kv table!',insert_counter - 1;
RAISE NOTICE '% records have been inserted into the new ts_kv table!',insert_counter - 1;
EXIT;
END IF;
INSERT INTO tenant_ts_kv(tenant_id, entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
VALUES (insert_record.tenant_id, insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v,
INSERT INTO ts_kv(entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v,
insert_record.long_v, insert_record.dbl_v);
IF MOD(insert_counter, insert_size) = 0 THEN
RAISE NOTICE '% records have been inserted into the new tenant_ts_kv table!',insert_counter;
RAISE NOTICE '% records have been inserted into the new ts_kv table!',insert_counter;
END IF;
END LOOP;
CLOSE insert_cursor;
END;
$$ LANGUAGE 'plpgsql';
$$;
-- select insert_into_ts_kv_latest();
-- call insert_into_ts_kv_latest();
CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest() LANGUAGE plpgsql AS $$
CREATE OR REPLACE FUNCTION insert_into_ts_kv_latest() RETURNS void AS
$$
DECLARE
insert_size CONSTANT integer := 10000;
insert_counter integer DEFAULT 0;
@ -191,7 +176,7 @@ DECLARE
latest_records.key AS key,
latest_records.entity_id AS entity_id,
latest_records.ts AS ts
FROM (SELECT DISTINCT key AS key, entity_id AS entity_id, MAX(ts) AS ts FROM tenant_ts_kv GROUP BY key, entity_id) AS latest_records;
FROM (SELECT DISTINCT key AS key, entity_id AS entity_id, MAX(ts) AS ts FROM ts_kv GROUP BY key, entity_id) AS latest_records;
BEGIN
OPEN insert_cursor;
LOOP
@ -201,7 +186,7 @@ BEGIN
RAISE NOTICE '% records have been inserted into the ts_kv_latest table!',insert_counter - 1;
EXIT;
END IF;
SELECT entity_id AS entity_id, key AS key, ts AS ts, bool_v AS bool_v, str_v AS str_v, long_v AS long_v, dbl_v AS dbl_v INTO insert_record FROM tenant_ts_kv WHERE entity_id = latest_record.entity_id AND key = latest_record.key AND ts = latest_record.ts;
SELECT entity_id AS entity_id, key AS key, ts AS ts, bool_v AS bool_v, str_v AS str_v, long_v AS long_v, dbl_v AS dbl_v INTO insert_record FROM ts_kv WHERE entity_id = latest_record.entity_id AND key = latest_record.key AND ts = latest_record.ts;
INSERT INTO ts_kv_latest(entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v, insert_record.long_v, insert_record.dbl_v);
IF MOD(insert_counter, insert_size) = 0 THEN
@ -210,4 +195,4 @@ BEGIN
END LOOP;
CLOSE insert_cursor;
END;
$$ LANGUAGE 'plpgsql';
$$;

View File

@ -22,38 +22,21 @@ import org.springframework.beans.factory.annotation.Value;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Types;
import java.sql.Statement;
@Slf4j
public abstract class AbstractSqlTsDatabaseUpgradeService {
protected static final String CALL_REGEX = "call ";
protected static final String CHECK_VERSION = "check_version()";
protected static final String CHECK_VERSION = "check_version(false)";
protected static final String CHECK_VERSION_TO_DELETE = "check_version(INOUT valid_version boolean)";
protected static final String DROP_TABLE = "DROP TABLE ";
protected static final String DROP_FUNCTION_IF_EXISTS = "DROP FUNCTION IF EXISTS ";
private static final String CALL_CHECK_VERSION = CALL_REGEX + CHECK_VERSION;
private static final String FUNCTION = "function: {}";
private static final String DROP_STATEMENT = "drop statement: {}";
private static final String QUERY = "query: {}";
private static final String SUCCESSFULLY_EXECUTED = "Successfully executed ";
private static final String FAILED_TO_EXECUTE = "Failed to execute ";
private static final String FAILED_DUE_TO = " due to: {}";
protected static final String SUCCESSFULLY_EXECUTED_FUNCTION = SUCCESSFULLY_EXECUTED + FUNCTION;
protected static final String FAILED_TO_EXECUTE_FUNCTION_DUE_TO = FAILED_TO_EXECUTE + FUNCTION + FAILED_DUE_TO;
protected static final String SUCCESSFULLY_EXECUTED_DROP_STATEMENT = SUCCESSFULLY_EXECUTED + DROP_STATEMENT;
protected static final String FAILED_TO_EXECUTE_DROP_STATEMENT = FAILED_TO_EXECUTE + DROP_STATEMENT + FAILED_DUE_TO;
protected static final String SUCCESSFULLY_EXECUTED_QUERY = SUCCESSFULLY_EXECUTED + QUERY;
protected static final String FAILED_TO_EXECUTE_QUERY = FAILED_TO_EXECUTE + QUERY + FAILED_DUE_TO;
protected static final String DROP_PROCEDURE_IF_EXISTS = "DROP PROCEDURE IF EXISTS ";
protected static final String DROP_PROCEDURE_CHECK_VERSION = DROP_PROCEDURE_IF_EXISTS + CHECK_VERSION_TO_DELETE;
@Value("${spring.datasource.url}")
protected String dbUrl;
@ -78,23 +61,22 @@ public abstract class AbstractSqlTsDatabaseUpgradeService {
log.info("Check the current PostgreSQL version...");
boolean versionValid = false;
try {
CallableStatement callableStatement = conn.prepareCall("{? = " + CALL_CHECK_VERSION + " }");
callableStatement.registerOutParameter(1, Types.BOOLEAN);
callableStatement.execute();
versionValid = callableStatement.getBoolean(1);
callableStatement.close();
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(CALL_REGEX + CHECK_VERSION);
resultSet.next();
versionValid = resultSet.getBoolean(1);
statement.close();
} catch (Exception e) {
log.info("Failed to check current PostgreSQL version due to: {}", e.getMessage());
}
return versionValid;
}
protected void executeFunction(Connection conn, String query) {
log.info("{} ... ", query);
protected void executeQuery(Connection conn, String query) {
try {
CallableStatement callableStatement = conn.prepareCall("{" + query + "}");
callableStatement.execute();
SQLWarning warnings = callableStatement.getWarnings();
Statement statement = conn.createStatement();
statement.execute(query); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
SQLWarning warnings = statement.getWarnings();
if (warnings != null) {
log.info("{}", warnings.getMessage());
SQLWarning nextWarning = warnings.getNextWarning();
@ -103,31 +85,10 @@ public abstract class AbstractSqlTsDatabaseUpgradeService {
nextWarning = nextWarning.getNextWarning();
}
}
callableStatement.close();
log.info(SUCCESSFULLY_EXECUTED_FUNCTION, query.replace(CALL_REGEX, ""));
Thread.sleep(2000);
} catch (Exception e) {
log.info(FAILED_TO_EXECUTE_FUNCTION_DUE_TO, query, e.getMessage());
}
}
protected void executeDropStatement(Connection conn, String query) {
try {
conn.createStatement().execute(query); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
log.info(SUCCESSFULLY_EXECUTED_DROP_STATEMENT, query);
Thread.sleep(5000);
log.info("Successfully executed query: {}", query);
} catch (InterruptedException | SQLException e) {
log.info(FAILED_TO_EXECUTE_DROP_STATEMENT, query, e.getMessage());
}
}
protected void executeQuery(Connection conn, String query) {
try {
conn.createStatement().execute(query); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
log.info(SUCCESSFULLY_EXECUTED_QUERY, query);
Thread.sleep(5000);
} catch (InterruptedException | SQLException e) {
log.info(FAILED_TO_EXECUTE_QUERY, query, e.getMessage());
log.info("Failed to execute query: {} due to: {}", query, e.getMessage());
}
}

View File

@ -57,14 +57,13 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
private static final String DROP_TABLE_TS_KV_OLD = DROP_TABLE + TS_KV_OLD;
private static final String DROP_TABLE_TS_KV_LATEST_OLD = DROP_TABLE + TS_KV_LATEST_OLD;
private static final String DROP_FUNCTION_CHECK_VERSION = DROP_FUNCTION_IF_EXISTS + CHECK_VERSION;
private static final String DROP_FUNCTION_CREATE_PARTITION_TS_KV_TABLE = DROP_FUNCTION_IF_EXISTS + CREATE_PARTITION_TS_KV_TABLE;
private static final String DROP_FUNCTION_CREATE_NEW_TS_KV_LATEST_TABLE = DROP_FUNCTION_IF_EXISTS + CREATE_NEW_TS_KV_LATEST_TABLE;
private static final String DROP_FUNCTION_CREATE_PARTITIONS = DROP_FUNCTION_IF_EXISTS + CREATE_PARTITIONS;
private static final String DROP_FUNCTION_CREATE_TS_KV_DICTIONARY_TABLE = DROP_FUNCTION_IF_EXISTS + CREATE_TS_KV_DICTIONARY_TABLE;
private static final String DROP_FUNCTION_INSERT_INTO_DICTIONARY = DROP_FUNCTION_IF_EXISTS + INSERT_INTO_DICTIONARY;
private static final String DROP_FUNCTION_INSERT_INTO_TS_KV = DROP_FUNCTION_IF_EXISTS + INSERT_INTO_TS_KV;
private static final String DROP_FUNCTION_INSERT_INTO_TS_KV_LATEST = DROP_FUNCTION_IF_EXISTS + INSERT_INTO_TS_KV_LATEST;
private static final String DROP_PROCEDURE_CREATE_PARTITION_TS_KV_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_PARTITION_TS_KV_TABLE;
private static final String DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_NEW_TS_KV_LATEST_TABLE;
private static final String DROP_PROCEDURE_CREATE_PARTITIONS = DROP_PROCEDURE_IF_EXISTS + CREATE_PARTITIONS;
private static final String DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_TS_KV_DICTIONARY_TABLE;
private static final String DROP_PROCEDURE_INSERT_INTO_DICTIONARY = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_DICTIONARY;
private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV;
private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST;
@Override
public void upgradeDatabase(String fromVersion) throws Exception {
@ -76,30 +75,30 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
loadSql(conn);
boolean versionValid = checkVersion(conn);
if (!versionValid) {
log.info("PostgreSQL version should be at least more than 10!");
log.info("PostgreSQL version should be at least more than 11!");
log.info("Please upgrade your PostgreSQL and restart the script!");
} else {
log.info("PostgreSQL version is valid!");
log.info("Updating schema ...");
executeFunction(conn, CALL_CREATE_PARTITION_TS_KV_TABLE);
executeFunction(conn, CALL_CREATE_PARTITIONS);
executeFunction(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE);
executeFunction(conn, CALL_INSERT_INTO_DICTIONARY);
executeFunction(conn, CALL_INSERT_INTO_TS_KV);
executeFunction(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);
executeFunction(conn, CALL_INSERT_INTO_TS_KV_LATEST);
executeQuery(conn, CALL_CREATE_PARTITION_TS_KV_TABLE);
executeQuery(conn, CALL_CREATE_PARTITIONS);
executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE);
executeQuery(conn, CALL_INSERT_INTO_DICTIONARY);
executeQuery(conn, CALL_INSERT_INTO_TS_KV);
executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);
executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST);
executeDropStatement(conn, DROP_TABLE_TS_KV_OLD);
executeDropStatement(conn, DROP_TABLE_TS_KV_LATEST_OLD);
executeQuery(conn, DROP_TABLE_TS_KV_OLD);
executeQuery(conn, DROP_TABLE_TS_KV_LATEST_OLD);
executeDropStatement(conn, DROP_FUNCTION_CHECK_VERSION);
executeDropStatement(conn, DROP_FUNCTION_CREATE_PARTITION_TS_KV_TABLE);
executeDropStatement(conn, DROP_FUNCTION_CREATE_PARTITIONS);
executeDropStatement(conn, DROP_FUNCTION_CREATE_TS_KV_DICTIONARY_TABLE);
executeDropStatement(conn, DROP_FUNCTION_INSERT_INTO_DICTIONARY);
executeDropStatement(conn, DROP_FUNCTION_INSERT_INTO_TS_KV);
executeDropStatement(conn, DROP_FUNCTION_CREATE_NEW_TS_KV_LATEST_TABLE);
executeDropStatement(conn, DROP_FUNCTION_INSERT_INTO_TS_KV_LATEST);
executeQuery(conn, DROP_PROCEDURE_CHECK_VERSION);
executeQuery(conn, DROP_PROCEDURE_CREATE_PARTITION_TS_KV_TABLE);
executeQuery(conn, DROP_PROCEDURE_CREATE_PARTITIONS);
executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE);
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY);
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV);
executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE);
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST);
executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN json_v json;");
executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN json_v json;");

View File

@ -45,13 +45,13 @@ public class TimescaleTsDatabaseSchemaService extends SqlAbstractDatabaseSchemaS
private long chunkTimeInterval;
public TimescaleTsDatabaseSchemaService() {
super("schema-timescale.sql", "schema-timescale-idx.sql");
super("schema-timescale.sql", null);
}
@Override
public void createDatabaseSchema() throws Exception {
super.createDatabaseSchema();
executeQuery("SELECT create_hypertable('tenant_ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);");
executeQuery("SELECT create_hypertable('ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);");
}
private void executeQuery(String query) {

View File

@ -43,27 +43,27 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
private static final String TENANT_TS_KV_OLD_TABLE = "tenant_ts_kv_old;";
private static final String CREATE_TS_KV_LATEST_TABLE = "create_ts_kv_latest_table()";
private static final String CREATE_NEW_TENANT_TS_KV_TABLE = "create_new_tenant_ts_kv_table()";
private static final String CREATE_NEW_TS_KV_TABLE = "create_new_ts_kv_table()";
private static final String CREATE_TS_KV_DICTIONARY_TABLE = "create_ts_kv_dictionary_table()";
private static final String INSERT_INTO_DICTIONARY = "insert_into_dictionary()";
private static final String INSERT_INTO_TENANT_TS_KV = "insert_into_tenant_ts_kv()";
private static final String INSERT_INTO_TS_KV = "insert_into_ts_kv()";
private static final String INSERT_INTO_TS_KV_LATEST = "insert_into_ts_kv_latest()";
private static final String CALL_CREATE_TS_KV_LATEST_TABLE = CALL_REGEX + CREATE_TS_KV_LATEST_TABLE;
private static final String CALL_CREATE_NEW_TENANT_TS_KV_TABLE = CALL_REGEX + CREATE_NEW_TENANT_TS_KV_TABLE;
private static final String CALL_CREATE_NEW_TENANT_TS_KV_TABLE = CALL_REGEX + CREATE_NEW_TS_KV_TABLE;
private static final String CALL_CREATE_TS_KV_DICTIONARY_TABLE = CALL_REGEX + CREATE_TS_KV_DICTIONARY_TABLE;
private static final String CALL_INSERT_INTO_DICTIONARY = CALL_REGEX + INSERT_INTO_DICTIONARY;
private static final String CALL_INSERT_INTO_TS_KV = CALL_REGEX + INSERT_INTO_TENANT_TS_KV;
private static final String CALL_INSERT_INTO_TS_KV = CALL_REGEX + INSERT_INTO_TS_KV;
private static final String CALL_INSERT_INTO_TS_KV_LATEST = CALL_REGEX + INSERT_INTO_TS_KV_LATEST;
private static final String DROP_OLD_TENANT_TS_KV_TABLE = DROP_TABLE + TENANT_TS_KV_OLD_TABLE;
private static final String DROP_FUNCTION_CREATE_TS_KV_LATEST_TABLE = DROP_FUNCTION_IF_EXISTS + CREATE_TS_KV_LATEST_TABLE;
private static final String DROP_FUNCTION_CREATE_TENANT_TS_KV_TABLE_COPY = DROP_FUNCTION_IF_EXISTS + CREATE_NEW_TENANT_TS_KV_TABLE;
private static final String DROP_FUNCTION_CREATE_TS_KV_DICTIONARY_TABLE = DROP_FUNCTION_IF_EXISTS + CREATE_TS_KV_DICTIONARY_TABLE;
private static final String DROP_FUNCTION_INSERT_INTO_DICTIONARY = DROP_FUNCTION_IF_EXISTS + INSERT_INTO_DICTIONARY;
private static final String DROP_FUNCTION_INSERT_INTO_TENANT_TS_KV = DROP_FUNCTION_IF_EXISTS + INSERT_INTO_TENANT_TS_KV;
private static final String DROP_FUNCTION_INSERT_INTO_TS_KV_LATEST = DROP_FUNCTION_IF_EXISTS + INSERT_INTO_TS_KV_LATEST;
private static final String DROP_PROCEDURE_CREATE_TS_KV_LATEST_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_TS_KV_LATEST_TABLE;
private static final String DROP_PROCEDURE_CREATE_TENANT_TS_KV_TABLE_COPY = DROP_PROCEDURE_IF_EXISTS + CREATE_NEW_TS_KV_TABLE;
private static final String DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_TS_KV_DICTIONARY_TABLE;
private static final String DROP_PROCEDURE_INSERT_INTO_DICTIONARY = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_DICTIONARY;
private static final String DROP_PROCEDURE_INSERT_INTO_TENANT_TS_KV = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV;
private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST;
@Autowired
private InstallScripts installScripts;
@ -78,33 +78,31 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
loadSql(conn);
boolean versionValid = checkVersion(conn);
if (!versionValid) {
log.info("PostgreSQL version should be at least more than 9.6!");
log.info("PostgreSQL version should be at least more than 11!");
log.info("Please upgrade your PostgreSQL and restart the script!");
} else {
log.info("PostgreSQL version is valid!");
log.info("Updating schema ...");
executeFunction(conn, CALL_CREATE_TS_KV_LATEST_TABLE);
executeFunction(conn, CALL_CREATE_NEW_TENANT_TS_KV_TABLE);
executeQuery(conn, CALL_CREATE_TS_KV_LATEST_TABLE);
executeQuery(conn, CALL_CREATE_NEW_TENANT_TS_KV_TABLE);
executeQuery(conn, "SELECT create_hypertable('tenant_ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);");
executeQuery(conn, "SELECT create_hypertable('ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);");
executeFunction(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE);
executeFunction(conn, CALL_INSERT_INTO_DICTIONARY);
executeFunction(conn, CALL_INSERT_INTO_TS_KV);
executeFunction(conn, CALL_INSERT_INTO_TS_KV_LATEST);
executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE);
executeQuery(conn, CALL_INSERT_INTO_DICTIONARY);
executeQuery(conn, CALL_INSERT_INTO_TS_KV);
executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST);
//executeQuery(conn, "SELECT set_chunk_time_interval('tenant_ts_kv', " + chunkTimeInterval +");");
executeQuery(conn, DROP_OLD_TENANT_TS_KV_TABLE);
executeDropStatement(conn, DROP_OLD_TENANT_TS_KV_TABLE);
executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_LATEST_TABLE);
executeQuery(conn, DROP_PROCEDURE_CREATE_TENANT_TS_KV_TABLE_COPY);
executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE);
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY);
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TENANT_TS_KV);
executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST);
executeDropStatement(conn, DROP_FUNCTION_CREATE_TS_KV_LATEST_TABLE);
executeDropStatement(conn, DROP_FUNCTION_CREATE_TENANT_TS_KV_TABLE_COPY);
executeDropStatement(conn, DROP_FUNCTION_CREATE_TS_KV_DICTIONARY_TABLE);
executeDropStatement(conn, DROP_FUNCTION_INSERT_INTO_DICTIONARY);
executeDropStatement(conn, DROP_FUNCTION_INSERT_INTO_TENANT_TS_KV);
executeDropStatement(conn, DROP_FUNCTION_INSERT_INTO_TS_KV_LATEST);
executeQuery(conn, "ALTER TABLE tenant_ts_kv ADD COLUMN json_v json;");
executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN json_v json;");
executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN json_v json;");
log.info("schema timeseries updated!");

View File

@ -36,6 +36,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.BOOLEAN_VALUE_COLU
import static org.thingsboard.server.dao.model.ModelConstants.DOUBLE_VALUE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.JSON_VALUE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.LONG_VALUE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
@ -53,6 +54,10 @@ public abstract class AbstractTsKvEntity implements ToData<TsKvEntry> {
@Column(name = ENTITY_ID_COLUMN, columnDefinition = "uuid")
protected UUID entityId;
@Id
@Column(name = KEY_COLUMN)
protected int key;
@Id
@Column(name = TS_COLUMN)
protected Long ts;

View File

@ -69,10 +69,6 @@ import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN;
})
public final class TsKvLatestEntity extends AbstractTsKvEntity {
@Id
@Column(name = KEY_COLUMN)
private int key;
@Override
public boolean isNotEmpty() {
return strValue != null || longValue != null || doubleValue != null || booleanValue != null || jsonValue != null;

View File

@ -31,7 +31,6 @@ public class TimescaleTsKvCompositeKey implements Serializable {
@Transient
private static final long serialVersionUID = -4089175869616037523L;
private UUID tenantId;
private UUID entityId;
private int key;
private long ts;

View File

@ -18,25 +18,18 @@ package org.thingsboard.server.dao.model.sqlts.timescale.ts;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ToData;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
import javax.persistence.Column;
import javax.persistence.ColumnResult;
import javax.persistence.ConstructorResult;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.IdClass;
import javax.persistence.NamedNativeQueries;
import javax.persistence.NamedNativeQuery;
import javax.persistence.SqlResultSetMapping;
import javax.persistence.SqlResultSetMappings;
import javax.persistence.Table;
import java.util.UUID;
import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.TENANT_ID_COLUMN;
import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_AVG;
import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_AVG_QUERY;
import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_COUNT;
@ -52,7 +45,7 @@ import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.F
@Data
@EqualsAndHashCode(callSuper = true)
@Entity
@Table(name = "tenant_ts_kv")
@Table(name = "ts_kv")
@IdClass(TimescaleTsKvCompositeKey.class)
@SqlResultSetMappings({
@SqlResultSetMapping(
@ -116,15 +109,7 @@ import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.F
resultSetMapping = "timescaleCountMapping"
)
})
public final class TimescaleTsKvEntity extends AbstractTsKvEntity implements ToData<TsKvEntry> {
@Id
@Column(name = TENANT_ID_COLUMN, columnDefinition = "uuid")
private UUID tenantId;
@Id
@Column(name = KEY_COLUMN)
private int key;
public final class TimescaleTsKvEntity extends AbstractTsKvEntity {
public TimescaleTsKvEntity() {
}

View File

@ -32,10 +32,6 @@ import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN;
@IdClass(TsKvCompositeKey.class)
public final class TsKvEntity extends AbstractTsKvEntity {
@Id
@Column(name = KEY_COLUMN)
private int key;
public TsKvEntity() {
}

View File

@ -96,7 +96,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
@Override
public ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return getRemoveLatestFuture(tenantId, entityId, query);
return getRemoveLatestFuture(entityId, query);
}
@Override
@ -125,9 +125,9 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
}
@Override
protected ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
protected ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, ReadTsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
return findAllAsyncWithLimit(tenantId, entityId, query);
return findAllAsyncWithLimit(entityId, query);
} else {
long stepTs = query.getStartTs();
List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
@ -135,7 +135,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
long startTs = stepTs;
long endTs = stepTs + query.getInterval();
long ts = startTs + (endTs - startTs) / 2;
futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
futures.add(findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
stepTs = endTs;
}
return getTskvEntriesFuture(Futures.allAsList(futures));
@ -143,7 +143,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
}
@Override
protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
Integer keyId = getOrSaveKeyId(query.getKey());
List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(
entityId.getId(),
@ -157,9 +157,9 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities));
}
protected ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();
switchAggregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures);
switchAggregation(entityId, key, startTs, endTs, aggregation, entitiesFutures);
return Futures.transform(setFutures(entitiesFutures), entity -> {
if (entity != null && entity.isNotEmpty()) {
entity.setEntityId(entityId.getId());
@ -172,29 +172,29 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
}, MoreExecutors.directExecutor());
}
protected void switchAggregation(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
protected void switchAggregation(EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
switch (aggregation) {
case AVG:
findAvg(tenantId, entityId, key, startTs, endTs, entitiesFutures);
findAvg(entityId, key, startTs, endTs, entitiesFutures);
break;
case MAX:
findMax(tenantId, entityId, key, startTs, endTs, entitiesFutures);
findMax(entityId, key, startTs, endTs, entitiesFutures);
break;
case MIN:
findMin(tenantId, entityId, key, startTs, endTs, entitiesFutures);
findMin(entityId, key, startTs, endTs, entitiesFutures);
break;
case SUM:
findSum(tenantId, entityId, key, startTs, endTs, entitiesFutures);
findSum(entityId, key, startTs, endTs, entitiesFutures);
break;
case COUNT:
findCount(tenantId, entityId, key, startTs, endTs, entitiesFutures);
findCount(entityId, key, startTs, endTs, entitiesFutures);
break;
default:
throw new IllegalArgumentException("Not supported aggregation type: " + aggregation);
}
}
protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
protected void findCount(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findCount(
entityId.getId(),
@ -203,7 +203,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
endTs));
}
protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
protected void findSum(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findSum(
entityId.getId(),
@ -212,7 +212,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
endTs));
}
protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
protected void findMin(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findStringMin(
entityId.getId(),
@ -226,7 +226,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
endTs));
}
protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
protected void findMax(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findStringMax(
entityId.getId(),
@ -240,7 +240,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
endTs));
}
protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
protected void findAvg(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findAvg(
entityId.getId(),

View File

@ -127,7 +127,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
protected ListenableFuture<List<TsKvEntry>> processFindAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
List<ListenableFuture<List<TsKvEntry>>> futures = queries
.stream()
.map(query -> findAllAsync(tenantId, entityId, query))
.map(query -> findAllAsync(entityId, query))
.collect(Collectors.toList());
return Futures.transform(Futures.allAsList(futures), new Function<List<List<TsKvEntry>>, List<TsKvEntry>>() {
@Nullable
@ -144,9 +144,9 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
}, service);
}
protected abstract ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query);
protected abstract ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, ReadTsKvQuery query);
protected abstract ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query);
protected abstract ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query);
protected ListenableFuture<List<TsKvEntry>> getTskvEntriesFuture(ListenableFuture<List<Optional<TsKvEntry>>> future) {
return Futures.transform(future, new Function<List<Optional<TsKvEntry>>, List<TsKvEntry>>() {
@ -164,12 +164,12 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
}, service);
}
protected ListenableFuture<List<TsKvEntry>> findNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
protected ListenableFuture<List<TsKvEntry>> findNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
long startTs = 0;
long endTs = query.getStartTs() - 1;
ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
Aggregation.NONE, DESC_ORDER);
return findAllAsync(tenantId, entityId, findNewLatestQuery);
return findAllAsync(entityId, findNewLatestQuery);
}
protected ListenableFuture<TsKvEntry> getFindLatestFuture(EntityId entityId, String key) {
@ -189,7 +189,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
return Futures.immediateFuture(result);
}
protected ListenableFuture<Void> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
protected ListenableFuture<Void> getRemoveLatestFuture(EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<TsKvEntry> latestFuture = getFindLatestFuture(entityId, query.getKey());
ListenableFuture<Boolean> booleanFuture = Futures.transform(latestFuture, tsKvEntry -> {
@ -217,7 +217,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
if (query.getRewriteLatestIfDeleted()) {
ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
if (isRemove) {
return getNewLatestEntryFuture(tenantId, entityId, query);
return getNewLatestEntryFuture(entityId, query);
}
return Futures.immediateFuture(null);
}, service);
@ -296,8 +296,8 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
return keyId;
}
private ListenableFuture<Void> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<List<TsKvEntry>> future = findNewLatestEntryFuture(tenantId, entityId, query);
private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<List<TsKvEntry>> future = findNewLatestEntryFuture(entityId, query);
return Futures.transformAsync(future, entryList -> {
if (entryList.size() == 1) {
return getSaveLatestFuture(entityId, entryList.get(0));

View File

@ -37,8 +37,8 @@ import java.util.List;
public class TimescaleInsertTsRepository extends AbstractInsertRepository implements InsertTsRepository<TimescaleTsKvEntity> {
private static final String INSERT_OR_UPDATE =
"INSERT INTO tenant_ts_kv (tenant_id, entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " +
"ON CONFLICT (tenant_id, entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);";
"INSERT INTO ts_kv (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " +
"ON CONFLICT (entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);";
@Override
public void saveOrUpdate(List<EntityContainer<TimescaleTsKvEntity>> entities) {
@ -46,41 +46,40 @@ public class TimescaleInsertTsRepository extends AbstractInsertRepository implem
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
TimescaleTsKvEntity tsKvEntity = entities.get(i).getEntity();
ps.setObject(1, tsKvEntity.getTenantId());
ps.setObject(2, tsKvEntity.getEntityId());
ps.setInt(3, tsKvEntity.getKey());
ps.setLong(4, tsKvEntity.getTs());
ps.setObject(1, tsKvEntity.getEntityId());
ps.setInt(2, tsKvEntity.getKey());
ps.setLong(3, tsKvEntity.getTs());
if (tsKvEntity.getBooleanValue() != null) {
ps.setBoolean(5, tsKvEntity.getBooleanValue());
ps.setBoolean(10, tsKvEntity.getBooleanValue());
ps.setBoolean(4, tsKvEntity.getBooleanValue());
ps.setBoolean(9, tsKvEntity.getBooleanValue());
} else {
ps.setNull(5, Types.BOOLEAN);
ps.setNull(10, Types.BOOLEAN);
ps.setNull(4, Types.BOOLEAN);
ps.setNull(9, Types.BOOLEAN);
}
ps.setString(6, replaceNullChars(tsKvEntity.getStrValue()));
ps.setString(11, replaceNullChars(tsKvEntity.getStrValue()));
ps.setString(5, replaceNullChars(tsKvEntity.getStrValue()));
ps.setString(10, replaceNullChars(tsKvEntity.getStrValue()));
if (tsKvEntity.getLongValue() != null) {
ps.setLong(7, tsKvEntity.getLongValue());
ps.setLong(12, tsKvEntity.getLongValue());
ps.setLong(6, tsKvEntity.getLongValue());
ps.setLong(11, tsKvEntity.getLongValue());
} else {
ps.setNull(7, Types.BIGINT);
ps.setNull(12, Types.BIGINT);
ps.setNull(6, Types.BIGINT);
ps.setNull(11, Types.BIGINT);
}
if (tsKvEntity.getDoubleValue() != null) {
ps.setDouble(8, tsKvEntity.getDoubleValue());
ps.setDouble(13, tsKvEntity.getDoubleValue());
ps.setDouble(7, tsKvEntity.getDoubleValue());
ps.setDouble(12, tsKvEntity.getDoubleValue());
} else {
ps.setNull(8, Types.DOUBLE);
ps.setNull(13, Types.DOUBLE);
ps.setNull(7, Types.DOUBLE);
ps.setNull(12, Types.DOUBLE);
}
ps.setString(9, replaceNullChars(tsKvEntity.getJsonValue()));
ps.setString(14, replaceNullChars(tsKvEntity.getJsonValue()));
ps.setString(8, replaceNullChars(tsKvEntity.getJsonValue()));
ps.setString(13, replaceNullChars(tsKvEntity.getJsonValue()));
}
@Override

View File

@ -36,7 +36,7 @@ public class AggregationRepository {
public static final String FIND_SUM = "findSum";
public static final String FIND_COUNT = "findCount";
public static final String FROM_WHERE_CLAUSE = "FROM tenant_ts_kv tskv WHERE tskv.tenant_id = cast(:tenantId AS uuid) AND tskv.entity_id = cast(:entityId AS uuid) AND tskv.key= cast(:entityKey AS int) AND tskv.ts > :startTs AND tskv.ts <= :endTs GROUP BY tskv.tenant_id, tskv.entity_id, tskv.key, tsBucket ORDER BY tskv.tenant_id, tskv.entity_id, tskv.key, tsBucket";
public static final String FROM_WHERE_CLAUSE = "FROM ts_kv tskv WHERE tskv.entity_id = cast(:entityId AS uuid) AND tskv.key= cast(:entityKey AS int) AND tskv.ts > :startTs AND tskv.ts <= :endTs GROUP BY tskv.entity_id, tskv.key, tsBucket ORDER BY tskv.entity_id, tskv.key, tsBucket";
public static final String FIND_AVG_QUERY = "SELECT time_bucket(:timeBucket, tskv.ts) AS tsBucket, :timeBucket AS interval, SUM(COALESCE(tskv.long_v, 0)) AS longValue, SUM(COALESCE(tskv.dbl_v, 0.0)) AS doubleValue, SUM(CASE WHEN tskv.long_v IS NULL THEN 0 ELSE 1 END) AS longCountValue, SUM(CASE WHEN tskv.dbl_v IS NULL THEN 0 ELSE 1 END) AS doubleCountValue, null AS strValue, 'AVG' AS aggType ";
@ -52,43 +52,42 @@ public class AggregationRepository {
private EntityManager entityManager;
@Async
public CompletableFuture<List<TimescaleTsKvEntity>> findAvg(UUID tenantId, UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) {
public CompletableFuture<List<TimescaleTsKvEntity>> findAvg(UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) {
@SuppressWarnings("unchecked")
List<TimescaleTsKvEntity> resultList = getResultList(tenantId, entityId, entityKey, timeBucket, startTs, endTs, FIND_AVG);
List<TimescaleTsKvEntity> resultList = getResultList(entityId, entityKey, timeBucket, startTs, endTs, FIND_AVG);
return CompletableFuture.supplyAsync(() -> resultList);
}
@Async
public CompletableFuture<List<TimescaleTsKvEntity>> findMax(UUID tenantId, UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) {
public CompletableFuture<List<TimescaleTsKvEntity>> findMax(UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) {
@SuppressWarnings("unchecked")
List<TimescaleTsKvEntity> resultList = getResultList(tenantId, entityId, entityKey, timeBucket, startTs, endTs, FIND_MAX);
List<TimescaleTsKvEntity> resultList = getResultList(entityId, entityKey, timeBucket, startTs, endTs, FIND_MAX);
return CompletableFuture.supplyAsync(() -> resultList);
}
@Async
public CompletableFuture<List<TimescaleTsKvEntity>> findMin(UUID tenantId, UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) {
public CompletableFuture<List<TimescaleTsKvEntity>> findMin(UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) {
@SuppressWarnings("unchecked")
List<TimescaleTsKvEntity> resultList = getResultList(tenantId, entityId, entityKey, timeBucket, startTs, endTs, FIND_MIN);
List<TimescaleTsKvEntity> resultList = getResultList(entityId, entityKey, timeBucket, startTs, endTs, FIND_MIN);
return CompletableFuture.supplyAsync(() -> resultList);
}
@Async
public CompletableFuture<List<TimescaleTsKvEntity>> findSum(UUID tenantId, UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) {
public CompletableFuture<List<TimescaleTsKvEntity>> findSum(UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) {
@SuppressWarnings("unchecked")
List<TimescaleTsKvEntity> resultList = getResultList(tenantId, entityId, entityKey, timeBucket, startTs, endTs, FIND_SUM);
List<TimescaleTsKvEntity> resultList = getResultList(entityId, entityKey, timeBucket, startTs, endTs, FIND_SUM);
return CompletableFuture.supplyAsync(() -> resultList);
}
@Async
public CompletableFuture<List<TimescaleTsKvEntity>> findCount(UUID tenantId, UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) {
public CompletableFuture<List<TimescaleTsKvEntity>> findCount(UUID entityId, int entityKey, long timeBucket, long startTs, long endTs) {
@SuppressWarnings("unchecked")
List<TimescaleTsKvEntity> resultList = getResultList(tenantId, entityId, entityKey, timeBucket, startTs, endTs, FIND_COUNT);
List<TimescaleTsKvEntity> resultList = getResultList(entityId, entityKey, timeBucket, startTs, endTs, FIND_COUNT);
return CompletableFuture.supplyAsync(() -> resultList);
}
private List getResultList(UUID tenantId, UUID entityId, int entityKey, long timeBucket, long startTs, long endTs, String query) {
private List getResultList(UUID entityId, int entityKey, long timeBucket, long startTs, long endTs, String query) {
return entityManager.createNamedQuery(query)
.setParameter("tenantId", tenantId)
.setParameter("entityId", entityId)
.setParameter("entityKey", entityKey)
.setParameter("timeBucket", timeBucket)

View File

@ -88,24 +88,23 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
}
@Override
protected ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
protected ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, ReadTsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
return findAllAsyncWithLimit(tenantId, entityId, query);
return findAllAsyncWithLimit(entityId, query);
} else {
long startTs = query.getStartTs();
long endTs = query.getEndTs();
long timeBucket = query.getInterval();
ListenableFuture<List<Optional<TsKvEntry>>> future = findAllAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, timeBucket, query.getAggregation());
ListenableFuture<List<Optional<TsKvEntry>>> future = findAllAndAggregateAsync(entityId, query.getKey(), startTs, endTs, timeBucket, query.getAggregation());
return getTskvEntriesFuture(future);
}
}
@Override
protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
String strKey = query.getKey();
Integer keyId = getOrSaveKeyId(strKey);
List<TimescaleTsKvEntity> timescaleTsKvEntities = tsKvRepository.findAllWithLimit(
tenantId.getId(),
entityId.getId(),
keyId,
query.getStartTs(),
@ -117,8 +116,8 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
return Futures.immediateFuture(DaoUtil.convertDataList(timescaleTsKvEntities));
}
private ListenableFuture<List<Optional<TsKvEntry>>> findAllAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long timeBucket, Aggregation aggregation) {
CompletableFuture<List<TimescaleTsKvEntity>> listCompletableFuture = switchAggregation(key, startTs, endTs, timeBucket, aggregation, entityId.getId(), tenantId.getId());
private ListenableFuture<List<Optional<TsKvEntry>>> findAllAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long timeBucket, Aggregation aggregation) {
CompletableFuture<List<TimescaleTsKvEntity>> listCompletableFuture = switchAggregation(key, startTs, endTs, timeBucket, aggregation, entityId.getId());
SettableFuture<List<TimescaleTsKvEntity>> listenableFuture = SettableFuture.create();
listCompletableFuture.whenComplete((timescaleTsKvEntities, throwable) -> {
if (throwable != null) {
@ -133,7 +132,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
timescaleTsKvEntities.forEach(entity -> {
if (entity != null && entity.isNotEmpty()) {
entity.setEntityId(entityId.getId());
entity.setTenantId(tenantId.getId());
entity.setStrKey(key);
result.add(Optional.of(DaoUtil.getData(entity)));
} else {
@ -167,7 +165,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
String strKey = tsKvEntry.getKey();
Integer keyId = getOrSaveKeyId(strKey);
TimescaleTsKvEntity entity = new TimescaleTsKvEntity();
entity.setTenantId(tenantId.getId());
entity.setEntityId(entityId.getId());
entity.setTs(tsKvEntry.getTs());
entity.setKey(keyId);
@ -197,7 +194,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
Integer keyId = getOrSaveKeyId(strKey);
return service.submit(() -> {
tsKvRepository.delete(
tenantId.getId(),
entityId.getId(),
keyId,
query.getStartTs(),
@ -208,7 +204,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
@Override
public ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return getRemoveLatestFuture(tenantId, entityId, query);
return getRemoveLatestFuture(entityId, query);
}
@Override
@ -216,27 +212,26 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
return service.submit(() -> null);
}
private CompletableFuture<List<TimescaleTsKvEntity>> switchAggregation(String key, long startTs, long endTs, long timeBucket, Aggregation aggregation, UUID entityId, UUID tenantId) {
private CompletableFuture<List<TimescaleTsKvEntity>> switchAggregation(String key, long startTs, long endTs, long timeBucket, Aggregation aggregation, UUID entityId) {
switch (aggregation) {
case AVG:
return findAvg(key, startTs, endTs, timeBucket, entityId, tenantId);
return findAvg(key, startTs, endTs, timeBucket, entityId);
case MAX:
return findMax(key, startTs, endTs, timeBucket, entityId, tenantId);
return findMax(key, startTs, endTs, timeBucket, entityId);
case MIN:
return findMin(key, startTs, endTs, timeBucket, entityId, tenantId);
return findMin(key, startTs, endTs, timeBucket, entityId);
case SUM:
return findSum(key, startTs, endTs, timeBucket, entityId, tenantId);
return findSum(key, startTs, endTs, timeBucket, entityId);
case COUNT:
return findCount(key, startTs, endTs, timeBucket, entityId, tenantId);
return findCount(key, startTs, endTs, timeBucket, entityId);
default:
throw new IllegalArgumentException("Not supported aggregation type: " + aggregation);
}
}
private CompletableFuture<List<TimescaleTsKvEntity>> findCount(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
private CompletableFuture<List<TimescaleTsKvEntity>> findCount(String key, long startTs, long endTs, long timeBucket, UUID entityId) {
Integer keyId = getOrSaveKeyId(key);
return aggregationRepository.findCount(
tenantId,
entityId,
keyId,
timeBucket,
@ -244,10 +239,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
endTs);
}
private CompletableFuture<List<TimescaleTsKvEntity>> findSum(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
private CompletableFuture<List<TimescaleTsKvEntity>> findSum(String key, long startTs, long endTs, long timeBucket, UUID entityId) {
Integer keyId = getOrSaveKeyId(key);
return aggregationRepository.findSum(
tenantId,
entityId,
keyId,
timeBucket,
@ -255,10 +249,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
endTs);
}
private CompletableFuture<List<TimescaleTsKvEntity>> findMin(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
private CompletableFuture<List<TimescaleTsKvEntity>> findMin(String key, long startTs, long endTs, long timeBucket, UUID entityId) {
Integer keyId = getOrSaveKeyId(key);
return aggregationRepository.findMin(
tenantId,
entityId,
keyId,
timeBucket,
@ -266,10 +259,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
endTs);
}
private CompletableFuture<List<TimescaleTsKvEntity>> findMax(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
private CompletableFuture<List<TimescaleTsKvEntity>> findMax(String key, long startTs, long endTs, long timeBucket, UUID entityId) {
Integer keyId = getOrSaveKeyId(key);
return aggregationRepository.findMax(
tenantId,
entityId,
keyId,
timeBucket,
@ -277,10 +269,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
endTs);
}
private CompletableFuture<List<TimescaleTsKvEntity>> findAvg(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
private CompletableFuture<List<TimescaleTsKvEntity>> findAvg(String key, long startTs, long endTs, long timeBucket, UUID entityId) {
Integer keyId = getOrSaveKeyId(key);
return aggregationRepository.findAvg(
tenantId,
entityId,
keyId,
timeBucket,

View File

@ -31,12 +31,10 @@ import java.util.UUID;
@TimescaleDBTsDao
public interface TsKvTimescaleRepository extends CrudRepository<TimescaleTsKvEntity, TimescaleTsKvCompositeKey> {
@Query("SELECT tskv FROM TimescaleTsKvEntity tskv WHERE tskv.tenantId = :tenantId " +
"AND tskv.entityId = :entityId " +
@Query("SELECT tskv FROM TimescaleTsKvEntity tskv WHERE tskv.entityId = :entityId " +
"AND tskv.key = :entityKey " +
"AND tskv.ts > :startTs AND tskv.ts <= :endTs")
List<TimescaleTsKvEntity> findAllWithLimit(
@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("entityKey") int key,
@Param("startTs") long startTs,
@ -44,12 +42,10 @@ public interface TsKvTimescaleRepository extends CrudRepository<TimescaleTsKvEnt
@Transactional
@Modifying
@Query("DELETE FROM TimescaleTsKvEntity tskv WHERE tskv.tenantId = :tenantId " +
"AND tskv.entityId = :entityId " +
@Query("DELETE FROM TimescaleTsKvEntity tskv WHERE tskv.entityId = :entityId " +
"AND tskv.key = :entityKey " +
"AND tskv.ts > :startTs AND tskv.ts <= :endTs")
void delete(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
void delete(@Param("entityId") UUID entityId,
@Param("entityKey") int key,
@Param("startTs") long startTs,
@Param("endTs") long endTs);

View File

@ -1,17 +0,0 @@
--
-- Copyright © 2016-2020 The Thingsboard Authors
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
CREATE INDEX IF NOT EXISTS idx_tenant_ts_kv ON tenant_ts_kv(tenant_id, entity_id, key, ts);

View File

@ -16,8 +16,7 @@
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;
CREATE TABLE IF NOT EXISTS tenant_ts_kv (
tenant_id uuid NOT NULL,
CREATE TABLE IF NOT EXISTS ts_kv (
entity_id uuid NOT NULL,
key int NOT NULL,
ts bigint NOT NULL,
@ -26,7 +25,7 @@ CREATE TABLE IF NOT EXISTS tenant_ts_kv (
long_v bigint,
dbl_v double precision,
json_v json,
CONSTRAINT tenant_ts_kv_pkey PRIMARY KEY (tenant_id, entity_id, key, ts)
CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts)
);
CREATE TABLE IF NOT EXISTS ts_kv_dictionary (

View File

@ -44,7 +44,7 @@ public class SqlDaoServiceTestSuite {
// @ClassRule
// public static CustomSqlUnit sqlUnit = new CustomSqlUnit(
// Arrays.asList("sql/schema-timescale.sql", "sql/schema-timescale-idx.sql", "sql/schema-entities.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql", "sql/system-test.sql"),
// Arrays.asList("sql/schema-timescale.sql", "sql/schema-entities.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql", "sql/system-test.sql"),
// "sql/timescale/drop-all-tables.sql",
// "sql-test.properties"
// );

View File

@ -12,7 +12,7 @@ DROP TABLE IF EXISTS event;
DROP TABLE IF EXISTS relation;
DROP TABLE IF EXISTS tb_user;
DROP TABLE IF EXISTS tenant;
DROP TABLE IF EXISTS tenant_ts_kv;
DROP TABLE IF EXISTS ts_kv;
DROP TABLE IF EXISTS ts_kv_latest;
DROP TABLE IF EXISTS user_credentials;
DROP TABLE IF EXISTS widget_type;