diff --git a/application/src/main/data/upgrade/2.4.3/schema_update_psql_ts.sql b/application/src/main/data/upgrade/2.4.3/schema_update_psql_ts.sql index e10f3ee2bd..671d39aae5 100644 --- a/application/src/main/data/upgrade/2.4.3/schema_update_psql_ts.sql +++ b/application/src/main/data/upgrade/2.4.3/schema_update_psql_ts.sql @@ -21,6 +21,8 @@ CREATE OR REPLACE PROCEDURE create_partition_ts_kv_table() LANGUAGE plpgsql AS $ BEGIN ALTER TABLE ts_kv RENAME TO ts_kv_old; + ALTER TABLE ts_kv_old + RENAME CONSTRAINT ts_kv_pkey TO ts_kv_pkey_old; CREATE TABLE IF NOT EXISTS ts_kv ( LIKE ts_kv_old @@ -32,6 +34,8 @@ BEGIN ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid; ALTER TABLE ts_kv ALTER COLUMN key TYPE integer USING key::integer; + ALTER TABLE ts_kv + ADD CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts); END; $$; @@ -59,33 +63,65 @@ BEGIN END; $$; +CREATE OR REPLACE FUNCTION get_partitions_data(IN partition_type varchar) + RETURNS + TABLE + ( + partition_date text, + from_ts bigint, + to_ts bigint + ) +AS +$$ +BEGIN + CASE + WHEN partition_type = 'DAYS' THEN + RETURN QUERY SELECT day_date.day AS partition_date, + (extract(epoch from (day_date.day)::timestamp) * 1000)::bigint AS from_ts, + (extract(epoch from (day_date.day::date + INTERVAL '1 DAY')::timestamp) * + 1000)::bigint AS to_ts + FROM (SELECT DISTINCT TO_CHAR(TO_TIMESTAMP(ts / 1000), 'YYYY_MM_DD') AS day + FROM ts_kv_old) AS day_date; + WHEN partition_type = 'MONTHS' THEN + RETURN QUERY SELECT SUBSTRING(month_date.first_date, 1, 7) AS partition_date, + (extract(epoch from (month_date.first_date)::timestamp) * 1000)::bigint AS from_ts, + (extract(epoch from (month_date.first_date::date + INTERVAL '1 MONTH')::timestamp) * + 1000)::bigint AS to_ts + FROM (SELECT DISTINCT TO_CHAR(TO_TIMESTAMP(ts / 1000), 'YYYY_MM_01') AS first_date + FROM ts_kv_old) AS month_date; + WHEN partition_type = 'YEARS' THEN + RETURN QUERY SELECT SUBSTRING(year_date.year, 1, 4) AS partition_date, + (extract(epoch from (year_date.year)::timestamp) * 1000)::bigint AS from_ts, + (extract(epoch from (year_date.year::date + INTERVAL '1 YEAR')::timestamp) * + 1000)::bigint AS to_ts + FROM (SELECT DISTINCT TO_CHAR(TO_TIMESTAMP(ts / 1000), 'YYYY_01_01') AS year FROM ts_kv_old) AS year_date; + ELSE + RAISE EXCEPTION 'Failed to parse partitioning property: % !', partition_type; + END CASE; +END; +$$ LANGUAGE plpgsql; -- call create_partitions(); -CREATE OR REPLACE PROCEDURE create_partitions() LANGUAGE plpgsql AS $$ +CREATE OR REPLACE PROCEDURE create_partitions(IN partition_type varchar) LANGUAGE plpgsql AS $$ DECLARE partition_date varchar; from_ts bigint; to_ts bigint; - key_cursor CURSOR FOR select SUBSTRING(month_date.first_date, 1, 7) AS partition_date, - extract(epoch from (month_date.first_date)::timestamp) * 1000 as from_ts, - extract(epoch from (month_date.first_date::date + INTERVAL '1 MONTH')::timestamp) * - 1000 as to_ts - FROM (SELECT DISTINCT TO_CHAR(TO_TIMESTAMP(ts / 1000), 'YYYY_MM_01') AS first_date - FROM ts_kv_old) AS month_date; + partitions_cursor CURSOR FOR SELECT * FROM get_partitions_data(partition_type); BEGIN - OPEN key_cursor; + OPEN partitions_cursor; LOOP - FETCH key_cursor INTO partition_date, from_ts, to_ts; + FETCH partitions_cursor INTO partition_date, from_ts, to_ts; EXIT WHEN NOT FOUND; EXECUTE 'CREATE TABLE IF NOT EXISTS ts_kv_' || partition_date || - ' PARTITION OF ts_kv(PRIMARY KEY (entity_id, key, ts)) FOR VALUES FROM (' || from_ts || + ' PARTITION OF ts_kv FOR VALUES FROM (' || from_ts || ') TO (' || to_ts || ');'; RAISE NOTICE 'A partition % has been created!',CONCAT('ts_kv_', partition_date); END LOOP; - CLOSE key_cursor; + CLOSE partitions_cursor; END; $$; diff --git a/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java index ab03ca23ab..7afa422460 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java @@ -106,7 +106,8 @@ public abstract class AbstractSqlTsDatabaseUpgradeService { Thread.sleep(2000); log.info("Successfully executed query: {}", query); } catch (InterruptedException | SQLException e) { - log.info("Failed to execute query: {} due to: {}", query, e.getMessage()); + log.error("Failed to execute query: {} due to: {}", query, e.getMessage()); + throw new RuntimeException("Failed to execute query:" + query + " due to: ", e); } } diff --git a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseSchemaService.java b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseSchemaService.java index 15b1e45247..bcc3d9bb81 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseSchemaService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseSchemaService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.install; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; import org.thingsboard.server.dao.util.PsqlDao; @@ -24,9 +25,20 @@ import org.thingsboard.server.dao.util.SqlTsDao; @SqlTsDao @PsqlDao @Profile("install") -public class PsqlTsDatabaseSchemaService extends SqlAbstractDatabaseSchemaService - implements TsDatabaseSchemaService { +public class PsqlTsDatabaseSchemaService extends SqlAbstractDatabaseSchemaService implements TsDatabaseSchemaService { + + @Value("${sql.postgres.ts_key_value_partitioning:MONTHS}") + private String partitionType; + public PsqlTsDatabaseSchemaService() { super("schema-ts-psql.sql", null); } + + @Override + public void createDatabaseSchema() throws Exception { + super.createDatabaseSchema(); + if (partitionType.equals("INDEFINITE")) { + executeQuery("CREATE TABLE ts_kv_indefinite PARTITION OF ts_kv DEFAULT;"); + } + } } \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java index 5f97a6eaa5..215eff736a 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.install; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; import org.thingsboard.server.dao.util.PsqlDao; @@ -33,6 +34,9 @@ import java.sql.DriverManager; @PsqlDao public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeService implements DatabaseTsUpgradeService { + @Value("${sql.postgres.ts_key_value_partitioning:MONTHS}") + private String partitionType; + private static final String LOAD_FUNCTIONS_SQL = "schema_update_psql_ts.sql"; private static final String LOAD_TTL_FUNCTIONS_SQL = "schema_update_ttl.sql"; private static final String LOAD_DROP_PARTITIONS_FUNCTIONS_SQL = "schema_update_psql_drop_partitions.sql"; @@ -50,7 +54,6 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe private static final String CALL_CREATE_PARTITION_TS_KV_TABLE = CALL_REGEX + CREATE_PARTITION_TS_KV_TABLE; private static final String CALL_CREATE_NEW_TS_KV_LATEST_TABLE = CALL_REGEX + CREATE_NEW_TS_KV_LATEST_TABLE; - private static final String CALL_CREATE_PARTITIONS = CALL_REGEX + CREATE_PARTITIONS; private static final String CALL_CREATE_TS_KV_DICTIONARY_TABLE = CALL_REGEX + CREATE_TS_KV_DICTIONARY_TABLE; private static final String CALL_INSERT_INTO_DICTIONARY = CALL_REGEX + INSERT_INTO_DICTIONARY; private static final String CALL_INSERT_INTO_TS_KV = CALL_REGEX + INSERT_INTO_TS_KV; @@ -66,6 +69,7 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe private static final String DROP_PROCEDURE_INSERT_INTO_DICTIONARY = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_DICTIONARY; private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV; private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST; + private static final String DROP_FUNCTION_GET_PARTITION_DATA = "DROP FUNCTION IF EXISTS get_partitions_data;"; @Override public void upgradeDatabase(String fromVersion) throws Exception { @@ -83,7 +87,11 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe loadSql(conn, LOAD_FUNCTIONS_SQL); log.info("Updating timeseries schema ..."); executeQuery(conn, CALL_CREATE_PARTITION_TS_KV_TABLE); - executeQuery(conn, CALL_CREATE_PARTITIONS); + if (!partitionType.equals("INDEFINITE")) { + executeQuery(conn, "call create_partitions('" + partitionType + "')"); + } else { + executeQuery(conn, "CREATE TABLE IF NOT EXISTS ts_kv_indefinite PARTITION OF ts_kv DEFAULT;"); + } executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE); executeQuery(conn, CALL_INSERT_INTO_DICTIONARY); executeQuery(conn, CALL_INSERT_INTO_TS_KV); @@ -100,9 +108,14 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV); executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE); executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); + executeQuery(conn, DROP_FUNCTION_GET_PARTITION_DATA); executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;"); executeQuery(conn, "ALTER TABLE ts_kv_latest ADD COLUMN IF NOT EXISTS json_v json;"); + } else { + executeQuery(conn, "ALTER TABLE ts_kv DROP CONSTRAINT IF EXISTS ts_kv_pkey;"); + executeQuery(conn, "ALTER TABLE ts_kv ADD CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts);"); } log.info("Load TTL functions ..."); diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlAbstractDatabaseSchemaService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlAbstractDatabaseSchemaService.java index b8c0d964e1..cfc32d31b3 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlAbstractDatabaseSchemaService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlAbstractDatabaseSchemaService.java @@ -25,6 +25,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.SQLException; @Slf4j public abstract class SqlAbstractDatabaseSchemaService implements DatabaseSchemaService { @@ -73,4 +74,14 @@ public abstract class SqlAbstractDatabaseSchemaService implements DatabaseSchema } } + protected void executeQuery(String query) { + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { + conn.createStatement().execute(query); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script + log.info("Successfully executed query: {}", query); + Thread.sleep(5000); + } catch (InterruptedException | SQLException e) { + log.info("Failed to execute query: {} due to: {}", query, e.getMessage()); + } + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseSchemaService.java b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseSchemaService.java index e8c542d956..e2633d6fc4 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseSchemaService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseSchemaService.java @@ -33,14 +33,6 @@ import java.sql.SQLException; @Slf4j public class TimescaleTsDatabaseSchemaService extends SqlAbstractDatabaseSchemaService implements TsDatabaseSchemaService { - private static final String QUERY = "query: {}"; - private static final String SUCCESSFULLY_EXECUTED = "Successfully executed "; - private static final String FAILED_TO_EXECUTE = "Failed to execute "; - private static final String FAILED_DUE_TO = " due to: {}"; - - private static final String SUCCESSFULLY_EXECUTED_QUERY = SUCCESSFULLY_EXECUTED + QUERY; - private static final String FAILED_TO_EXECUTE_QUERY = FAILED_TO_EXECUTE + QUERY + FAILED_DUE_TO; - @Value("${sql.timescale.chunk_time_interval:86400000}") private long chunkTimeInterval; @@ -54,15 +46,4 @@ public class TimescaleTsDatabaseSchemaService extends SqlAbstractDatabaseSchemaS executeQuery("SELECT create_hypertable('ts_kv', 'ts', chunk_time_interval => " + chunkTimeInterval + ", if_not_exists => true);"); } - private void executeQuery(String query) { - try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { - conn.createStatement().execute(query); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script - log.info(SUCCESSFULLY_EXECUTED_QUERY, query); - Thread.sleep(5000); - } catch (InterruptedException | SQLException e) { - log.info(FAILED_TO_EXECUTE_QUERY, query, e.getMessage()); - } - } - - } \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java index daa74d013f..a903698e3c 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java @@ -18,10 +18,13 @@ package org.thingsboard.server.service.script; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; /** @@ -30,9 +33,22 @@ import java.util.concurrent.atomic.AtomicInteger; @Slf4j public abstract class AbstractJsInvokeService implements JsInvokeService { + protected ScheduledExecutorService timeoutExecutorService; protected Map scriptIdToNameMap = new ConcurrentHashMap<>(); protected Map blackListedFunctions = new ConcurrentHashMap<>(); + public void init(long maxRequestsTimeout) { + if (maxRequestsTimeout > 0) { + timeoutExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nashorn-js-timeout")); + } + } + + public void stop() { + if (timeoutExecutorService != null) { + timeoutExecutorService.shutdownNow(); + } + } + @Override public ListenableFuture eval(JsScriptType scriptType, String scriptBody, String... argNames) { UUID scriptId = UUID.randomUUID(); diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java index a9acfa4f42..5ea8d13270 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java @@ -48,7 +48,6 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer private NashornSandbox sandbox; private ScriptEngine engine; private ExecutorService monitorExecutorService; - private ScheduledExecutorService timeoutExecutorService; private final AtomicInteger jsPushedMsgs = new AtomicInteger(0); private final AtomicInteger jsInvokeMsgs = new AtomicInteger(0); @@ -85,9 +84,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer @PostConstruct public void init() { - if (maxRequestsTimeout > 0) { - timeoutExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nashorn-js-timeout")); - } + super.init(maxRequestsTimeout); if (useJsSandbox()) { sandbox = NashornSandboxes.create(); monitorExecutorService = Executors.newWorkStealingPool(getMonitorThreadPoolSize()); @@ -104,12 +101,10 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer @PreDestroy public void stop() { + super.stop(); if (monitorExecutorService != null) { monitorExecutorService.shutdownNow(); } - if (timeoutExecutorService != null) { - timeoutExecutorService.shutdownNow(); - } } protected abstract boolean useJsSandbox(); diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index fe2e7e8071..663cdc978d 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -87,11 +87,13 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { @PostConstruct public void init() { + super.init(maxRequestsTimeout); requestTemplate.init(); } @PreDestroy public void destroy() { + super.stop(); if (requestTemplate != null) { requestTemplate.stop(); } @@ -111,7 +113,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { log.trace("Post compile request for scriptId [{}]", scriptId); ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); - + if (maxRequestsTimeout > 0) { + future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); + } kafkaPushedMsgs.incrementAndGet(); Futures.addCallback(future, new FutureCallback>() { @Override @@ -154,8 +158,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .setTimeout((int) maxRequestsTimeout) .setScriptBody(scriptIdToBodysMap.get(scriptId)); - for (int i = 0; i < args.length; i++) { - jsRequestBuilder.addArgs(args[i].toString()); + for (Object arg : args) { + jsRequestBuilder.addArgs(arg.toString()); } JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder() @@ -163,6 +167,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .build(); ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); + if (maxRequestsTimeout > 0) { + future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); + } kafkaPushedMsgs.incrementAndGet(); Futures.addCallback(future, new FutureCallback>() { @Override @@ -203,6 +210,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .build(); ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); + if (maxRequestsTimeout > 0) { + future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); + } JsInvokeProtos.RemoteJsResponse response = future.get().getValue(); JsInvokeProtos.JsReleaseResponse compilationResult = response.getReleaseResponse(); diff --git a/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java index 347eaee7eb..d8653e945d 100644 --- a/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java @@ -33,7 +33,7 @@ public class ControllerSqlTestSuite { @ClassRule public static CustomSqlUnit sqlUnit = new CustomSqlUnit( Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql"), - "sql/drop-all-tables.sql", + "sql/hsql/drop-all-tables.sql", "sql-test.properties"); @BeforeClass diff --git a/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java index 70de3b27ca..69a6e99353 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java @@ -32,7 +32,7 @@ public class MqttSqlTestSuite { @ClassRule public static CustomSqlUnit sqlUnit = new CustomSqlUnit( Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), - "sql/drop-all-tables.sql", + "sql/hsql/drop-all-tables.sql", "sql-test.properties"); @BeforeClass diff --git a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java index 83ac5f7703..72484e3f21 100644 --- a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java @@ -33,7 +33,7 @@ public class RuleEngineSqlTestSuite { @ClassRule public static CustomSqlUnit sqlUnit = new CustomSqlUnit( Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), - "sql/drop-all-tables.sql", + "sql/hsql/drop-all-tables.sql", "sql-test.properties"); @BeforeClass diff --git a/application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java index 6060a2cc2b..36d24e9b00 100644 --- a/application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java @@ -34,7 +34,7 @@ public class SystemSqlTestSuite { @ClassRule public static CustomSqlUnit sqlUnit = new CustomSqlUnit( Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), - "sql/drop-all-tables.sql", + "sql/hsql/drop-all-tables.sql", "sql-test.properties"); @BeforeClass diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index fafe05957f..c663d4346a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -54,7 +54,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq @Autowired protected InsertTsRepository insertRepository; - protected TbSqlBlockingQueue> tsQueue; + protected TbSqlBlockingQueue tsQueue; @PostConstruct protected void init() { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/JpaHsqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/JpaHsqlTimeseriesDao.java index 41ecfc3734..5d94d12a1a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/JpaHsqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/JpaHsqlTimeseriesDao.java @@ -23,7 +23,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao; -import org.thingsboard.server.dao.sqlts.EntityContainer; import org.thingsboard.server.dao.timeseries.TimeseriesDao; import org.thingsboard.server.dao.util.HsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; @@ -48,7 +47,7 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa entity.setLongValue(tsKvEntry.getLongValue().orElse(null)); entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); log.trace("Saving entity: {}", entity); - return tsQueue.add(new EntityContainer(entity, null)); + return tsQueue.add(entity); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/InsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/InsertTsRepository.java index a2c066322a..9618d79230 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/InsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/InsertTsRepository.java @@ -16,12 +16,11 @@ package org.thingsboard.server.dao.sqlts.insert; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; -import org.thingsboard.server.dao.sqlts.EntityContainer; import java.util.List; public interface InsertTsRepository { - void saveOrUpdate(List> entities); + void saveOrUpdate(List entities); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/hsql/HsqlInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/hsql/HsqlInsertTsRepository.java index 189758a947..5fd585aec8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/hsql/HsqlInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/hsql/HsqlInsertTsRepository.java @@ -19,7 +19,6 @@ import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; -import org.thingsboard.server.dao.sqlts.EntityContainer; import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; import org.thingsboard.server.dao.util.HsqlDao; @@ -47,12 +46,11 @@ public class HsqlInsertTsRepository extends AbstractInsertRepository implements "VALUES (T.entity_id, T.key, T.ts, T.bool_v, T.str_v, T.long_v, T.dbl_v, T.json_v);"; @Override - public void saveOrUpdate(List> entities) { + public void saveOrUpdate(List entities) { jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { - EntityContainer tsKvEntityEntityContainer = entities.get(i); - TsKvEntity tsKvEntity = tsKvEntityEntityContainer.getEntity(); + TsKvEntity tsKvEntity = entities.get(i); ps.setObject(1, tsKvEntity.getEntityId()); ps.setInt(2, tsKvEntity.getKey()); ps.setLong(3, tsKvEntity.getTs()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/psql/PsqlInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/psql/PsqlInsertTsRepository.java index d4a5dd25b0..85028bb942 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/psql/PsqlInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/psql/PsqlInsertTsRepository.java @@ -20,7 +20,6 @@ import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; -import org.thingsboard.server.dao.sqlts.EntityContainer; import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; @@ -28,10 +27,7 @@ import org.thingsboard.server.dao.util.SqlTsDao; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; @SqlTsDao @PsqlDao @@ -39,22 +35,15 @@ import java.util.Map; @Transactional public class PsqlInsertTsRepository extends AbstractInsertRepository implements InsertTsRepository { - private static final String INSERT_INTO_TS_KV = "INSERT INTO ts_kv_"; - - private static final String VALUES_ON_CONFLICT_DO_UPDATE = " (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES (?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " + + private static final String INSERT_ON_CONFLICT_DO_UPDATE = "INSERT INTO ts_kv (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES (?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " + "ON CONFLICT (entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);"; @Override - public void saveOrUpdate(List> entities) { - Map> partitionMap = new HashMap<>(); - for (EntityContainer entityContainer : entities) { - List tsKvEntities = partitionMap.computeIfAbsent(entityContainer.getPartitionDate(), k -> new ArrayList<>()); - tsKvEntities.add(entityContainer.getEntity()); - } - partitionMap.forEach((partition, entries) -> jdbcTemplate.batchUpdate(getInsertOrUpdateQuery(partition), new BatchPreparedStatementSetter() { + public void saveOrUpdate(List entities) { + jdbcTemplate.batchUpdate(INSERT_ON_CONFLICT_DO_UPDATE, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { - TsKvEntity tsKvEntity = entries.get(i); + TsKvEntity tsKvEntity = entities.get(i); ps.setObject(1, tsKvEntity.getEntityId()); ps.setInt(2, tsKvEntity.getKey()); ps.setLong(3, tsKvEntity.getTs()); @@ -93,12 +82,9 @@ public class PsqlInsertTsRepository extends AbstractInsertRepository implements @Override public int getBatchSize() { - return entries.size(); + return entities.size(); } - })); + }); } - private String getInsertOrUpdateQuery(String partitionDate) { - return INSERT_INTO_TS_KV + partitionDate + VALUES_ON_CONFLICT_DO_UPDATE; - } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java index 1fa1fc4219..da91982f5d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java @@ -20,7 +20,6 @@ import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; -import org.thingsboard.server.dao.sqlts.EntityContainer; import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.util.TimescaleDBTsDao; @@ -41,11 +40,11 @@ public class TimescaleInsertTsRepository extends AbstractInsertRepository implem "ON CONFLICT (entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);"; @Override - public void saveOrUpdate(List> entities) { + public void saveOrUpdate(List entities) { jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { - TimescaleTsKvEntity tsKvEntity = entities.get(i).getEntity(); + TimescaleTsKvEntity tsKvEntity = entities.get(i); ps.setObject(1, tsKvEntity.getEntityId()); ps.setInt(2, tsKvEntity.getKey()); ps.setLong(3, tsKvEntity.getTs()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java index 29cdd64918..e0854a0a19 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java @@ -25,7 +25,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao; -import org.thingsboard.server.dao.sqlts.EntityContainer; import org.thingsboard.server.dao.sqlts.insert.psql.PsqlPartitioningRepository; import org.thingsboard.server.dao.timeseries.PsqlPartition; import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate; @@ -42,8 +41,6 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; -import static org.thingsboard.server.dao.timeseries.SqlTsPartitionDate.EPOCH_START; - @Component @Slf4j @@ -58,7 +55,6 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa private PsqlPartitioningRepository partitioningRepository; private SqlTsPartitionDate tsFormat; - private PsqlPartition indefinitePartition; @Value("${sql.postgres.ts_key_value_partitioning:MONTHS}") private String partitioning; @@ -69,10 +65,6 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa Optional partition = SqlTsPartitionDate.parse(partitioning); if (partition.isPresent()) { tsFormat = partition.get(); - if (tsFormat.equals(SqlTsPartitionDate.INDEFINITE)) { - indefinitePartition = new PsqlPartition(toMills(EPOCH_START), Long.MAX_VALUE, tsFormat.getPattern()); - savePartition(indefinitePartition); - } } else { log.warn("Incorrect configuration of partitioning {}", partitioning); throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!"); @@ -81,6 +73,7 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { + savePartitionIfNotExist(tsKvEntry.getTs()); String strKey = tsKvEntry.getKey(); Integer keyId = getOrSaveKeyId(strKey); TsKvEntity entity = new TsKvEntity(); @@ -92,9 +85,23 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa entity.setLongValue(tsKvEntry.getLongValue().orElse(null)); entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); entity.setJsonValue(tsKvEntry.getJsonValue().orElse(null)); - PsqlPartition psqlPartition = toPartition(tsKvEntry.getTs()); log.trace("Saving entity: {}", entity); - return tsQueue.add(new EntityContainer(entity, psqlPartition.getPartitionDate())); + return tsQueue.add(entity); + } + + private void savePartitionIfNotExist(long ts) { + if (!tsFormat.equals(SqlTsPartitionDate.INDEFINITE)) { + LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); + LocalDateTime localDateTimeStart = tsFormat.trancateTo(time); + long partitionStartTs = toMills(localDateTimeStart); + if (partitions.get(partitionStartTs) == null) { + LocalDateTime localDateTimeEnd = tsFormat.plusTo(localDateTimeStart); + long partitionEndTs = toMills(localDateTimeEnd); + ZonedDateTime zonedDateTime = localDateTimeStart.atZone(ZoneOffset.UTC); + String partitionDate = zonedDateTime.format(DateTimeFormatter.ofPattern(tsFormat.getPattern())); + savePartition(new PsqlPartition(partitionStartTs, partitionEndTs, partitionDate)); + } + } } private void savePartition(PsqlPartition psqlPartition) { @@ -111,28 +118,6 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa } } - private PsqlPartition toPartition(long ts) { - if (tsFormat.equals(SqlTsPartitionDate.INDEFINITE)) { - return indefinitePartition; - } else { - LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); - LocalDateTime localDateTimeStart = tsFormat.trancateTo(time); - long partitionStartTs = toMills(localDateTimeStart); - PsqlPartition partition = partitions.get(partitionStartTs); - if (partition != null) { - return partition; - } else { - LocalDateTime localDateTimeEnd = tsFormat.plusTo(localDateTimeStart); - long partitionEndTs = toMills(localDateTimeEnd); - ZonedDateTime zonedDateTime = localDateTimeStart.atZone(ZoneOffset.UTC); - String partitionDate = zonedDateTime.format(DateTimeFormatter.ofPattern(tsFormat.getPattern())); - partition = new PsqlPartition(partitionStartTs, partitionEndTs, partitionDate); - savePartition(partition); - return partition; - } - } - } - private static long toMills(LocalDateTime time) { return time.toInstant(ZoneOffset.UTC).toEpochMilli(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 0a2e210648..d9121f684e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -36,7 +36,6 @@ import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao; -import org.thingsboard.server.dao.sqlts.EntityContainer; import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; import org.thingsboard.server.dao.timeseries.TimeseriesDao; import org.thingsboard.server.dao.util.TimescaleDBTsDao; @@ -64,7 +63,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements @Autowired protected InsertTsRepository insertRepository; - protected TbSqlBlockingQueue> tsQueue; + protected TbSqlBlockingQueue tsQueue; @PostConstruct protected void init() { @@ -175,7 +174,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements entity.setJsonValue(tsKvEntry.getJsonValue().orElse(null)); log.trace("Saving entity to timescale db: {}", entity); - return tsQueue.add(new EntityContainer(entity, null)); + return tsQueue.add(entity); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/PsqlPartition.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/PsqlPartition.java index 09f5312498..ffb7dec97e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/PsqlPartition.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/PsqlPartition.java @@ -35,6 +35,6 @@ public class PsqlPartition { } private String createStatement(long start, long end, String partitionDate) { - return "CREATE TABLE IF NOT EXISTS " + TABLE_REGEX + partitionDate + " PARTITION OF ts_kv(PRIMARY KEY (entity_id, key, ts)) FOR VALUES FROM (" + start + ") TO (" + end + ")"; + return "CREATE TABLE IF NOT EXISTS " + TABLE_REGEX + partitionDate + " PARTITION OF ts_kv FOR VALUES FROM (" + start + ") TO (" + end + ")"; } } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/SqlTsPartitionDate.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/SqlTsPartitionDate.java index ceede6f19c..7edbafd816 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/SqlTsPartitionDate.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/SqlTsPartitionDate.java @@ -23,7 +23,7 @@ import java.util.Optional; public enum SqlTsPartitionDate { - MINUTES("yyyy_MM_dd_HH_mm", ChronoUnit.MINUTES), HOURS("yyyy_MM_dd_HH", ChronoUnit.HOURS), DAYS("yyyy_MM_dd", ChronoUnit.DAYS), MONTHS("yyyy_MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS), INDEFINITE("indefinite", ChronoUnit.FOREVER); + DAYS("yyyy_MM_dd", ChronoUnit.DAYS), MONTHS("yyyy_MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS), INDEFINITE("indefinite", ChronoUnit.FOREVER); private final String pattern; private final transient TemporalUnit truncateUnit; @@ -44,10 +44,6 @@ public enum SqlTsPartitionDate { public LocalDateTime trancateTo(LocalDateTime time) { switch (this) { - case MINUTES: - return time.truncatedTo(ChronoUnit.MINUTES); - case HOURS: - return time.truncatedTo(ChronoUnit.HOURS); case DAYS: return time.truncatedTo(ChronoUnit.DAYS); case MONTHS: @@ -63,10 +59,6 @@ public enum SqlTsPartitionDate { public LocalDateTime plusTo(LocalDateTime time) { switch (this) { - case MINUTES: - return time.plusMinutes(1); - case HOURS: - return time.plusHours(1); case DAYS: return time.plusDays(1); case MONTHS: diff --git a/dao/src/main/resources/sql/schema-ts-psql.sql b/dao/src/main/resources/sql/schema-ts-psql.sql index f83e80931b..6f6177de01 100644 --- a/dao/src/main/resources/sql/schema-ts-psql.sql +++ b/dao/src/main/resources/sql/schema-ts-psql.sql @@ -23,7 +23,8 @@ CREATE TABLE IF NOT EXISTS ts_kv str_v varchar(10000000), long_v bigint, dbl_v double precision, - json_v json + json_v json, + CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts) ) PARTITION BY RANGE (ts); CREATE TABLE IF NOT EXISTS ts_kv_latest diff --git a/dao/src/test/java/org/thingsboard/server/dao/JpaDaoTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/JpaDaoTestSuite.java index 0af8603bf1..23ec159a4b 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/JpaDaoTestSuite.java +++ b/dao/src/test/java/org/thingsboard/server/dao/JpaDaoTestSuite.java @@ -31,14 +31,14 @@ public class JpaDaoTestSuite { @ClassRule public static CustomSqlUnit sqlUnit = new CustomSqlUnit( Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), - "sql/drop-all-tables.sql", + "sql/hsql/drop-all-tables.sql", "sql-test.properties" ); // @ClassRule // public static CustomSqlUnit sqlUnit = new CustomSqlUnit( // Arrays.asList("sql/schema-ts-psql.sql", "sql/schema-entities.sql", "sql/system-data.sql"), -// "sql/drop-all-tables.sql", +// "sql/psql/drop-all-tables.sql", // "sql-test.properties" // ); diff --git a/dao/src/test/java/org/thingsboard/server/dao/SqlDaoServiceTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/SqlDaoServiceTestSuite.java index 6d306c4e71..32fd45c188 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/SqlDaoServiceTestSuite.java +++ b/dao/src/test/java/org/thingsboard/server/dao/SqlDaoServiceTestSuite.java @@ -31,14 +31,14 @@ public class SqlDaoServiceTestSuite { @ClassRule public static CustomSqlUnit sqlUnit = new CustomSqlUnit( Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql", "sql/system-test.sql"), - "sql/drop-all-tables.sql", + "sql/hsql/drop-all-tables.sql", "sql-test.properties" ); // @ClassRule // public static CustomSqlUnit sqlUnit = new CustomSqlUnit( // Arrays.asList("sql/schema-ts-psql.sql", "sql/schema-entities.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql", "sql/system-test.sql"), -// "sql/drop-all-tables.sql", +// "sql/psql/drop-all-tables.sql", // "sql-test.properties" // ); diff --git a/dao/src/test/resources/sql/drop-all-tables.sql b/dao/src/test/resources/sql/hsql/drop-all-tables.sql similarity index 100% rename from dao/src/test/resources/sql/drop-all-tables.sql rename to dao/src/test/resources/sql/hsql/drop-all-tables.sql diff --git a/dao/src/test/resources/sql/psql/drop-all-tables.sql b/dao/src/test/resources/sql/psql/drop-all-tables.sql new file mode 100644 index 0000000000..b2e4a27963 --- /dev/null +++ b/dao/src/test/resources/sql/psql/drop-all-tables.sql @@ -0,0 +1,24 @@ +DROP TABLE IF EXISTS admin_settings; +DROP TABLE IF EXISTS alarm; +DROP TABLE IF EXISTS asset; +DROP TABLE IF EXISTS audit_log; +DROP TABLE IF EXISTS attribute_kv; +DROP TABLE IF EXISTS component_descriptor; +DROP TABLE IF EXISTS customer; +DROP TABLE IF EXISTS dashboard; +DROP TABLE IF EXISTS device; +DROP TABLE IF EXISTS device_credentials; +DROP TABLE IF EXISTS event; +DROP TABLE IF EXISTS relation; +DROP TABLE IF EXISTS tb_user; +DROP TABLE IF EXISTS tenant; +DROP TABLE IF EXISTS ts_kv; +DROP TABLE IF EXISTS ts_kv_latest; +DROP TABLE IF EXISTS ts_kv_dictionary; +DROP TABLE IF EXISTS user_credentials; +DROP TABLE IF EXISTS widget_type; +DROP TABLE IF EXISTS widgets_bundle; +DROP TABLE IF EXISTS rule_node; +DROP TABLE IF EXISTS rule_chain; +DROP TABLE IF EXISTS entity_view; +DROP TABLE IF EXISTS tb_schema_settings; \ No newline at end of file diff --git a/dao/src/test/resources/sql/timescale/drop-all-tables.sql b/dao/src/test/resources/sql/timescale/drop-all-tables.sql index ac921c0f4a..b2e4a27963 100644 --- a/dao/src/test/resources/sql/timescale/drop-all-tables.sql +++ b/dao/src/test/resources/sql/timescale/drop-all-tables.sql @@ -14,9 +14,11 @@ DROP TABLE IF EXISTS tb_user; DROP TABLE IF EXISTS tenant; DROP TABLE IF EXISTS ts_kv; DROP TABLE IF EXISTS ts_kv_latest; +DROP TABLE IF EXISTS ts_kv_dictionary; DROP TABLE IF EXISTS user_credentials; DROP TABLE IF EXISTS widget_type; DROP TABLE IF EXISTS widgets_bundle; DROP TABLE IF EXISTS rule_node; DROP TABLE IF EXISTS rule_chain; -DROP TABLE IF EXISTS entity_view; \ No newline at end of file +DROP TABLE IF EXISTS entity_view; +DROP TABLE IF EXISTS tb_schema_settings; \ No newline at end of file diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.js b/msa/js-executor/api/jsInvokeMessageProcessor.js index 79ea505bcf..a4398424db 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.js +++ b/msa/js-executor/api/jsInvokeMessageProcessor.js @@ -59,7 +59,7 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) { } else if (request.releaseRequest) { this.processReleaseRequest(requestId, responseTopic, headers, request.releaseRequest); } else { - logger.error('[%s] Unknown request recevied!', requestId); + logger.error('[%s] Unknown request received!', requestId); } } catch (err) { diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js index e22ec8cd71..33637cee81 100644 --- a/msa/js-executor/queue/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -41,8 +41,6 @@ function KafkaProducer() { } } - let headersData = headers.data; - headersData = Object.fromEntries(Object.entries(headersData).map(([key, value]) => [key, Buffer.from(value)])); return producer.send( { topic: responseTopic, @@ -50,7 +48,7 @@ function KafkaProducer() { { key: scriptId, value: rawResponse, - headers: headersData + headers: headers.data } ] }); @@ -96,15 +94,10 @@ function KafkaProducer() { eachMessage: async ({topic, partition, message}) => { let headers = message.headers; let key = message.key; - let data = message.value; let msg = {}; - - headers = Object.fromEntries( - Object.entries(headers).map(([key, value]) => [key, [...value]])); - msg.key = key.toString('utf8'); - msg.data = [...data]; - msg.headers = {data: headers} + msg.data = message.value; + msg.headers = {data: headers}; messageProcessor.onJsInvokeMessage(msg); }, }); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckAlarmStatusNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckAlarmStatusNode.java index dc0eb9f396..7338270c9b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckAlarmStatusNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckAlarmStatusNode.java @@ -38,7 +38,7 @@ import java.io.IOException; @Slf4j @RuleNode( type = ComponentType.FILTER, - name = "checks alarm status", + name = "check alarm status", configClazz = TbCheckAlarmStatusNodeConfig.class, relationTypes = {"True", "False"}, nodeDescription = "Checks alarm status.", diff --git a/ui/src/app/api/user.service.js b/ui/src/app/api/user.service.js index dacfe955a2..9ac8270403 100644 --- a/ui/src/app/api/user.service.js +++ b/ui/src/app/api/user.service.js @@ -386,6 +386,26 @@ function UserService($http, $q, $rootScope, adminService, dashboardService, time deferred.reject(); } procceedJwtTokenValidate(); + } else if (locationSearch.username && locationSearch.password) { + var user = {}; + user.name = locationSearch.username; + user.password = locationSearch.password; + $location.search('username', null); + $location.search('password', null); + + loginService.login(user).then(function success(response) { + var token = response.data.token; + var refreshToken = response.data.refreshToken; + try { + updateAndValidateToken(token, 'jwt_token', false); + updateAndValidateToken(refreshToken, 'refresh_token', false); + } catch (e) { + deferred.reject(); + } + procceedJwtTokenValidate(); + }, function fail() { + deferred.reject(); + }); } else { procceedJwtTokenValidate(); } diff --git a/ui/src/app/login/login.controller.js b/ui/src/app/login/login.controller.js index d749ea0936..8dcd2a9d8e 100644 --- a/ui/src/app/login/login.controller.js +++ b/ui/src/app/login/login.controller.js @@ -20,7 +20,7 @@ import logoSvg from '../../svg/logo_title_white.svg'; /* eslint-enable import/no-unresolved, import/default */ /*@ngInject*/ -export default function LoginController(toast, loginService, userService, types, $state, $stateParams/*, $rootScope, $log, $translate*/) { +export default function LoginController(toast, loginService, userService, types, $state/*, $rootScope, $log, $translate*/) { var vm = this; vm.logoSvg = logoSvg; @@ -32,12 +32,6 @@ export default function LoginController(toast, loginService, userService, types, vm.login = login; - if ($stateParams.username && $stateParams.password) { - vm.user.name = $stateParams.username; - vm.user.password = $stateParams.password; - doLogin(); - } - function doLogin() { loginService.login(vm.user).then(function success(response) { var token = response.data.token; diff --git a/ui/src/app/login/login.routes.js b/ui/src/app/login/login.routes.js index 51121a782a..ac9401147b 100644 --- a/ui/src/app/login/login.routes.js +++ b/ui/src/app/login/login.routes.js @@ -25,7 +25,7 @@ import createPasswordTemplate from './create-password.tpl.html'; /*@ngInject*/ export default function LoginRoutes($stateProvider) { $stateProvider.state('login', { - url: '/login?username&password', + url: '/login', module: 'public', views: { "@": {