diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/InsertLatestTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/InsertLatestTsRepository.java index 0aa95fa324..d85b66a258 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/InsertLatestTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/InsertLatestTsRepository.java @@ -21,6 +21,6 @@ import java.util.List; public interface InsertLatestTsRepository { - void saveOrUpdate(List entities); + List saveOrUpdate(List entities); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java index 530ac7ce23..48b54c2281 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java @@ -15,23 +15,28 @@ */ package org.thingsboard.server.dao.sqlts.insert.latest.sql; +import jakarta.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.PreparedStatementCreator; +import org.springframework.jdbc.core.SqlProvider; +import org.springframework.jdbc.support.GeneratedKeyHolder; +import org.springframework.jdbc.support.KeyHolder; import org.springframework.stereotype.Repository; -import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; import org.thingsboard.server.dao.util.SqlDao; import org.thingsboard.server.dao.util.SqlTsLatestAnyDao; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; import java.util.List; +import java.util.Map; @SqlTsLatestAnyDao @@ -44,129 +49,185 @@ public class SqlLatestInsertTsRepository extends AbstractInsertRepository implem private Boolean updateByLatestTs; private static final String BATCH_UPDATE = - "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json) WHERE entity_id = ? AND key = ?"; + "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), seq_number = nextval('ts_kv_latest_seq') WHERE entity_id = ? AND key = ?"; private static final String INSERT_OR_UPDATE = - "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " + - "ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json)"; + "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v, seq_number) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), nextval('ts_kv_latest_seq')) " + + "ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), seq_number = nextval('ts_kv_latest_seq')"; private static final String BATCH_UPDATE_BY_LATEST_TS = BATCH_UPDATE + " AND ts_kv_latest.ts <= ?"; private static final String INSERT_OR_UPDATE_BY_LATEST_TS = INSERT_OR_UPDATE + " WHERE ts_kv_latest.ts <= ?"; + private static final String RETURNING = " RETURNING seq_number"; + + private static final String SEQ_NUMBER = "seq_number"; + + private String batchUpdateQuery; + private String insertOrUpdateQuery; + + @PostConstruct + private void init() { + this.batchUpdateQuery = (updateByLatestTs ? BATCH_UPDATE_BY_LATEST_TS : BATCH_UPDATE) + RETURNING; + this.insertOrUpdateQuery = (updateByLatestTs ? INSERT_OR_UPDATE_BY_LATEST_TS : INSERT_OR_UPDATE) + RETURNING; + } + @Override - public void saveOrUpdate(List entities) { - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(TransactionStatus status) { - String batchUpdateQuery = updateByLatestTs ? BATCH_UPDATE_BY_LATEST_TS : BATCH_UPDATE; - String insertOrUpdateQuery = updateByLatestTs ? INSERT_OR_UPDATE_BY_LATEST_TS : INSERT_OR_UPDATE; + public List saveOrUpdate(List entities) { + return transactionTemplate.execute(status -> { + List seqNumbers = new ArrayList<>(entities.size()); - int[] result = jdbcTemplate.batchUpdate(batchUpdateQuery, new BatchPreparedStatementSetter() { - @Override - public void setValues(PreparedStatement ps, int i) throws SQLException { - TsKvLatestEntity tsKvLatestEntity = entities.get(i); - ps.setLong(1, tsKvLatestEntity.getTs()); + KeyHolder keyHolder = new GeneratedKeyHolder(); - if (tsKvLatestEntity.getBooleanValue() != null) { - ps.setBoolean(2, tsKvLatestEntity.getBooleanValue()); - } else { - ps.setNull(2, Types.BOOLEAN); - } + int[] updateResult = onBatchUpdate(entities, keyHolder); - ps.setString(3, replaceNullChars(tsKvLatestEntity.getStrValue())); + List> seqNumbersList = keyHolder.getKeyList(); - if (tsKvLatestEntity.getLongValue() != null) { - ps.setLong(4, tsKvLatestEntity.getLongValue()); - } else { - ps.setNull(4, Types.BIGINT); - } + int notUpdatedCount = entities.size() - seqNumbersList.size(); - if (tsKvLatestEntity.getDoubleValue() != null) { - ps.setDouble(5, tsKvLatestEntity.getDoubleValue()); - } else { - ps.setNull(5, Types.DOUBLE); - } - - ps.setString(6, replaceNullChars(tsKvLatestEntity.getJsonValue())); - - ps.setObject(7, tsKvLatestEntity.getEntityId()); - ps.setInt(8, tsKvLatestEntity.getKey()); - if (updateByLatestTs) { - ps.setLong(9, tsKvLatestEntity.getTs()); - } - } - - @Override - public int getBatchSize() { - return entities.size(); - } - }); - - int updatedCount = 0; - for (int i = 0; i < result.length; i++) { - if (result[i] == 0) { - updatedCount++; - } + List toInsertIndexes = new ArrayList<>(notUpdatedCount); + List insertEntities = new ArrayList<>(notUpdatedCount); + int keyHolderIndex = 0; + for (int i = 0; i < updateResult.length; i++) { + if (updateResult[i] == 0) { + insertEntities.add(entities.get(i)); + seqNumbers.add(0L); + toInsertIndexes.add(i); + } else { + seqNumbers.add((Long) seqNumbersList.get(keyHolderIndex).get(SEQ_NUMBER)); + keyHolderIndex++; } - - List insertEntities = new ArrayList<>(updatedCount); - for (int i = 0; i < result.length; i++) { - if (result[i] == 0) { - insertEntities.add(entities.get(i)); - } - } - - jdbcTemplate.batchUpdate(insertOrUpdateQuery, new BatchPreparedStatementSetter() { - @Override - public void setValues(PreparedStatement ps, int i) throws SQLException { - TsKvLatestEntity tsKvLatestEntity = insertEntities.get(i); - ps.setObject(1, tsKvLatestEntity.getEntityId()); - ps.setInt(2, tsKvLatestEntity.getKey()); - - ps.setLong(3, tsKvLatestEntity.getTs()); - ps.setLong(9, tsKvLatestEntity.getTs()); - if (updateByLatestTs) { - ps.setLong(15, tsKvLatestEntity.getTs()); - } - - if (tsKvLatestEntity.getBooleanValue() != null) { - ps.setBoolean(4, tsKvLatestEntity.getBooleanValue()); - ps.setBoolean(10, tsKvLatestEntity.getBooleanValue()); - } else { - ps.setNull(4, Types.BOOLEAN); - ps.setNull(10, Types.BOOLEAN); - } - - ps.setString(5, replaceNullChars(tsKvLatestEntity.getStrValue())); - ps.setString(11, replaceNullChars(tsKvLatestEntity.getStrValue())); - - if (tsKvLatestEntity.getLongValue() != null) { - ps.setLong(6, tsKvLatestEntity.getLongValue()); - ps.setLong(12, tsKvLatestEntity.getLongValue()); - } else { - ps.setNull(6, Types.BIGINT); - ps.setNull(12, Types.BIGINT); - } - - if (tsKvLatestEntity.getDoubleValue() != null) { - ps.setDouble(7, tsKvLatestEntity.getDoubleValue()); - ps.setDouble(13, tsKvLatestEntity.getDoubleValue()); - } else { - ps.setNull(7, Types.DOUBLE); - ps.setNull(13, Types.DOUBLE); - } - - ps.setString(8, replaceNullChars(tsKvLatestEntity.getJsonValue())); - ps.setString(14, replaceNullChars(tsKvLatestEntity.getJsonValue())); - } - - @Override - public int getBatchSize() { - return insertEntities.size(); - } - }); } + + if (insertEntities.isEmpty()) { + return seqNumbers; + } + + onInsertOrUpdate(insertEntities, keyHolder); + + seqNumbersList = keyHolder.getKeyList(); + + for (int i = 0; i < seqNumbersList.size(); i++) { + seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(SEQ_NUMBER)); + } + + return seqNumbers; }); } + + private int[] onBatchUpdate(List entities, KeyHolder keyHolder) { + return jdbcTemplate.batchUpdate(new SimplePreparedStatementCreator(batchUpdateQuery), new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + TsKvLatestEntity tsKvLatestEntity = entities.get(i); + ps.setLong(1, tsKvLatestEntity.getTs()); + + if (tsKvLatestEntity.getBooleanValue() != null) { + ps.setBoolean(2, tsKvLatestEntity.getBooleanValue()); + } else { + ps.setNull(2, Types.BOOLEAN); + } + + ps.setString(3, replaceNullChars(tsKvLatestEntity.getStrValue())); + + if (tsKvLatestEntity.getLongValue() != null) { + ps.setLong(4, tsKvLatestEntity.getLongValue()); + } else { + ps.setNull(4, Types.BIGINT); + } + + if (tsKvLatestEntity.getDoubleValue() != null) { + ps.setDouble(5, tsKvLatestEntity.getDoubleValue()); + } else { + ps.setNull(5, Types.DOUBLE); + } + + ps.setString(6, replaceNullChars(tsKvLatestEntity.getJsonValue())); + + ps.setObject(7, tsKvLatestEntity.getEntityId()); + ps.setInt(8, tsKvLatestEntity.getKey()); + if (updateByLatestTs) { + ps.setLong(9, tsKvLatestEntity.getTs()); + } + } + + @Override + public int getBatchSize() { + return entities.size(); + } + }, keyHolder); + } + + private void onInsertOrUpdate(List insertEntities, KeyHolder keyHolder) { + jdbcTemplate.batchUpdate(new SimplePreparedStatementCreator(insertOrUpdateQuery), new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + TsKvLatestEntity tsKvLatestEntity = insertEntities.get(i); + ps.setObject(1, tsKvLatestEntity.getEntityId()); + ps.setInt(2, tsKvLatestEntity.getKey()); + + ps.setLong(3, tsKvLatestEntity.getTs()); + ps.setLong(9, tsKvLatestEntity.getTs()); + if (updateByLatestTs) { + ps.setLong(15, tsKvLatestEntity.getTs()); + } + + if (tsKvLatestEntity.getBooleanValue() != null) { + ps.setBoolean(4, tsKvLatestEntity.getBooleanValue()); + ps.setBoolean(10, tsKvLatestEntity.getBooleanValue()); + } else { + ps.setNull(4, Types.BOOLEAN); + ps.setNull(10, Types.BOOLEAN); + } + + ps.setString(5, replaceNullChars(tsKvLatestEntity.getStrValue())); + ps.setString(11, replaceNullChars(tsKvLatestEntity.getStrValue())); + + if (tsKvLatestEntity.getLongValue() != null) { + ps.setLong(6, tsKvLatestEntity.getLongValue()); + ps.setLong(12, tsKvLatestEntity.getLongValue()); + } else { + ps.setNull(6, Types.BIGINT); + ps.setNull(12, Types.BIGINT); + } + + if (tsKvLatestEntity.getDoubleValue() != null) { + ps.setDouble(7, tsKvLatestEntity.getDoubleValue()); + ps.setDouble(13, tsKvLatestEntity.getDoubleValue()); + } else { + ps.setNull(7, Types.DOUBLE); + ps.setNull(13, Types.DOUBLE); + } + + ps.setString(8, replaceNullChars(tsKvLatestEntity.getJsonValue())); + ps.setString(14, replaceNullChars(tsKvLatestEntity.getJsonValue())); + } + + @Override + public int getBatchSize() { + return insertEntities.size(); + } + }, keyHolder); + } + + private static class SimplePreparedStatementCreator implements PreparedStatementCreator, SqlProvider { + + private static final String[] COLUMNS = {SEQ_NUMBER}; + private final String sql; + + + public SimplePreparedStatementCreator(String sql) { + this.sql = sql; + } + + @Override + public PreparedStatement createPreparedStatement(Connection con) throws SQLException { + return con.prepareStatement(sql, COLUMNS); + } + + @Override + public String getSql() { + return this.sql; + } + } } diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index f2978b7f4a..b92c27574c 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -538,6 +538,8 @@ CREATE TABLE IF NOT EXISTS entity_view ( CONSTRAINT entity_view_external_id_unq_key UNIQUE (tenant_id, external_id) ); +CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_seq cache 1000; + CREATE TABLE IF NOT EXISTS ts_kv_latest ( entity_id uuid NOT NULL, @@ -548,6 +550,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest long_v bigint, dbl_v double precision, json_v json, + seq_number bigint, CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) ); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/sql/LatestTimeseriesPerformanceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/sql/LatestTimeseriesPerformanceTest.java new file mode 100644 index 0000000000..00f458ca52 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/sql/LatestTimeseriesPerformanceTest.java @@ -0,0 +1,151 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.service.timeseries.sql; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.dao.service.AbstractServiceTest; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +@DaoSqlTest +public class LatestTimeseriesPerformanceTest extends AbstractServiceTest { + + private static final String STRING_KEY = "stringKey"; + private static final String LONG_KEY = "longKey"; + private static final String DOUBLE_KEY = "doubleKey"; + private static final String BOOLEAN_KEY = "booleanKey"; + public static final int AMOUNT_OF_UNIQ_KEY = 10000; + + private final Random random = new Random(); + + @Autowired + private TimeseriesLatestDao timeseriesLatestDao; + + private ListeningExecutorService testExecutor; + + private EntityId entityId; + + private AtomicLong saveCounter; + + @Before + public void before() { + Tenant tenant = new Tenant(); + tenant.setTitle("My tenant"); + Tenant savedTenant = tenantService.saveTenant(tenant); + Assert.assertNotNull(savedTenant); + tenantId = savedTenant.getId(); + entityId = new DeviceId(UUID.randomUUID()); + saveCounter = new AtomicLong(0); + testExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(200, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope"))); + } + + @After + public void after() { + tenantService.deleteTenant(tenantId); + if (testExecutor != null) { + testExecutor.shutdownNow(); + } + } + + @Test + public void test_save_latest_timeseries() throws Exception { + warmup(); + saveCounter.set(0); + + long startTime = System.currentTimeMillis(); + List> futures = new ArrayList<>(); + for (int i = 0; i < 25_000; i++) { + futures.add(save(generateStrEntry(getRandomKey()))); + futures.add(save(generateLngEntry(getRandomKey()))); + futures.add(save(generateDblEntry(getRandomKey()))); + futures.add(save(generateBoolEntry(getRandomKey()))); + } + Futures.allAsList(futures).get(60, TimeUnit.SECONDS); + long endTime = System.currentTimeMillis(); + + long totalTime = endTime - startTime; + + System.out.println("Total time: " + totalTime); + System.out.println("Saved count: " + saveCounter.get()); + System.out.println("Saved per 1 sec: " + saveCounter.get() * 1000 / totalTime); + } + + private void warmup() throws Exception { + List> futures = new ArrayList<>(); + for (int i = 0; i < AMOUNT_OF_UNIQ_KEY; i++) { + futures.add(save(generateStrEntry(i))); + futures.add(save(generateLngEntry(i))); + futures.add(save(generateDblEntry(i))); + futures.add(save(generateBoolEntry(i))); + } + Futures.allAsList(futures).get(60, TimeUnit.SECONDS); + } + + private ListenableFuture save(TsKvEntry tsKvEntry) { + return Futures.transformAsync(testExecutor.submit(() -> timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry)), result -> { + saveCounter.incrementAndGet(); + return result; + }, testExecutor); + } + + private TsKvEntry generateStrEntry(int keyIndex) { + return new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(STRING_KEY + keyIndex, RandomStringUtils.random(10))); + } + + private TsKvEntry generateLngEntry(int keyIndex) { + return new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(LONG_KEY + keyIndex, random.nextLong())); + } + + private TsKvEntry generateDblEntry(int keyIndex) { + return new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry(DOUBLE_KEY + keyIndex, random.nextDouble())); + } + + private TsKvEntry generateBoolEntry(int keyIndex) { + return new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(BOOLEAN_KEY + keyIndex, random.nextBoolean())); + } + + private int getRandomKey() { + return random.nextInt(AMOUNT_OF_UNIQ_KEY); + } + +} diff --git a/dao/src/test/resources/logback.xml b/dao/src/test/resources/logback.xml index 5e293b2982..4f7a2142df 100644 --- a/dao/src/test/resources/logback.xml +++ b/dao/src/test/resources/logback.xml @@ -12,6 +12,7 @@ + diff --git a/dao/src/test/resources/sql/psql/drop-all-tables.sql b/dao/src/test/resources/sql/psql/drop-all-tables.sql index 9c772df45b..3b1b37242f 100644 --- a/dao/src/test/resources/sql/psql/drop-all-tables.sql +++ b/dao/src/test/resources/sql/psql/drop-all-tables.sql @@ -37,6 +37,7 @@ DROP TABLE IF EXISTS tenant; DROP TABLE IF EXISTS ts_kv; DROP TABLE IF EXISTS ts_kv_latest; DROP TABLE IF EXISTS ts_kv_dictionary; +DROP SEQUENCE IF EXISTS ts_kv_latest_seq; DROP TABLE IF EXISTS user_credentials; DROP TABLE IF EXISTS widgets_bundle_widget; DROP TABLE IF EXISTS widget_type;