added sequence numver for lates timeseries

This commit is contained in:
YevhenBondarenko 2024-06-10 16:02:32 +02:00
parent 2c2273d15b
commit c44ba6307c
6 changed files with 328 additions and 111 deletions

View File

@ -21,6 +21,6 @@ import java.util.List;
public interface InsertLatestTsRepository { public interface InsertLatestTsRepository {
void saveOrUpdate(List<TsKvLatestEntity> entities); List<Long> saveOrUpdate(List<TsKvLatestEntity> entities);
} }

View File

@ -15,23 +15,28 @@
*/ */
package org.thingsboard.server.dao.sqlts.insert.latest.sql; package org.thingsboard.server.dao.sqlts.insert.latest.sql;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.SqlProvider;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
import org.thingsboard.server.dao.util.SqlDao; import org.thingsboard.server.dao.util.SqlDao;
import org.thingsboard.server.dao.util.SqlTsLatestAnyDao; import org.thingsboard.server.dao.util.SqlTsLatestAnyDao;
import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Types; import java.sql.Types;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
@SqlTsLatestAnyDao @SqlTsLatestAnyDao
@ -44,129 +49,185 @@ public class SqlLatestInsertTsRepository extends AbstractInsertRepository implem
private Boolean updateByLatestTs; private Boolean updateByLatestTs;
private static final String BATCH_UPDATE = private static final String BATCH_UPDATE =
"UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json) WHERE entity_id = ? AND key = ?"; "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), seq_number = nextval('ts_kv_latest_seq') WHERE entity_id = ? AND key = ?";
private static final String INSERT_OR_UPDATE = private static final String INSERT_OR_UPDATE =
"INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " + "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v, seq_number) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), nextval('ts_kv_latest_seq')) " +
"ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json)"; "ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), seq_number = nextval('ts_kv_latest_seq')";
private static final String BATCH_UPDATE_BY_LATEST_TS = BATCH_UPDATE + " AND ts_kv_latest.ts <= ?"; private static final String BATCH_UPDATE_BY_LATEST_TS = BATCH_UPDATE + " AND ts_kv_latest.ts <= ?";
private static final String INSERT_OR_UPDATE_BY_LATEST_TS = INSERT_OR_UPDATE + " WHERE ts_kv_latest.ts <= ?"; private static final String INSERT_OR_UPDATE_BY_LATEST_TS = INSERT_OR_UPDATE + " WHERE ts_kv_latest.ts <= ?";
private static final String RETURNING = " RETURNING seq_number";
private static final String SEQ_NUMBER = "seq_number";
private String batchUpdateQuery;
private String insertOrUpdateQuery;
@PostConstruct
private void init() {
this.batchUpdateQuery = (updateByLatestTs ? BATCH_UPDATE_BY_LATEST_TS : BATCH_UPDATE) + RETURNING;
this.insertOrUpdateQuery = (updateByLatestTs ? INSERT_OR_UPDATE_BY_LATEST_TS : INSERT_OR_UPDATE) + RETURNING;
}
@Override @Override
public void saveOrUpdate(List<TsKvLatestEntity> entities) { public List<Long> saveOrUpdate(List<TsKvLatestEntity> entities) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() { return transactionTemplate.execute(status -> {
@Override List<Long> seqNumbers = new ArrayList<>(entities.size());
protected void doInTransactionWithoutResult(TransactionStatus status) {
String batchUpdateQuery = updateByLatestTs ? BATCH_UPDATE_BY_LATEST_TS : BATCH_UPDATE;
String insertOrUpdateQuery = updateByLatestTs ? INSERT_OR_UPDATE_BY_LATEST_TS : INSERT_OR_UPDATE;
int[] result = jdbcTemplate.batchUpdate(batchUpdateQuery, new BatchPreparedStatementSetter() { KeyHolder keyHolder = new GeneratedKeyHolder();
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
TsKvLatestEntity tsKvLatestEntity = entities.get(i);
ps.setLong(1, tsKvLatestEntity.getTs());
if (tsKvLatestEntity.getBooleanValue() != null) { int[] updateResult = onBatchUpdate(entities, keyHolder);
ps.setBoolean(2, tsKvLatestEntity.getBooleanValue());
} else {
ps.setNull(2, Types.BOOLEAN);
}
ps.setString(3, replaceNullChars(tsKvLatestEntity.getStrValue())); List<Map<String, Object>> seqNumbersList = keyHolder.getKeyList();
if (tsKvLatestEntity.getLongValue() != null) { int notUpdatedCount = entities.size() - seqNumbersList.size();
ps.setLong(4, tsKvLatestEntity.getLongValue());
} else {
ps.setNull(4, Types.BIGINT);
}
if (tsKvLatestEntity.getDoubleValue() != null) { List<Integer> toInsertIndexes = new ArrayList<>(notUpdatedCount);
ps.setDouble(5, tsKvLatestEntity.getDoubleValue()); List<TsKvLatestEntity> insertEntities = new ArrayList<>(notUpdatedCount);
} else { int keyHolderIndex = 0;
ps.setNull(5, Types.DOUBLE); for (int i = 0; i < updateResult.length; i++) {
} if (updateResult[i] == 0) {
insertEntities.add(entities.get(i));
ps.setString(6, replaceNullChars(tsKvLatestEntity.getJsonValue())); seqNumbers.add(0L);
toInsertIndexes.add(i);
ps.setObject(7, tsKvLatestEntity.getEntityId()); } else {
ps.setInt(8, tsKvLatestEntity.getKey()); seqNumbers.add((Long) seqNumbersList.get(keyHolderIndex).get(SEQ_NUMBER));
if (updateByLatestTs) { keyHolderIndex++;
ps.setLong(9, tsKvLatestEntity.getTs());
}
}
@Override
public int getBatchSize() {
return entities.size();
}
});
int updatedCount = 0;
for (int i = 0; i < result.length; i++) {
if (result[i] == 0) {
updatedCount++;
}
} }
List<TsKvLatestEntity> insertEntities = new ArrayList<>(updatedCount);
for (int i = 0; i < result.length; i++) {
if (result[i] == 0) {
insertEntities.add(entities.get(i));
}
}
jdbcTemplate.batchUpdate(insertOrUpdateQuery, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
TsKvLatestEntity tsKvLatestEntity = insertEntities.get(i);
ps.setObject(1, tsKvLatestEntity.getEntityId());
ps.setInt(2, tsKvLatestEntity.getKey());
ps.setLong(3, tsKvLatestEntity.getTs());
ps.setLong(9, tsKvLatestEntity.getTs());
if (updateByLatestTs) {
ps.setLong(15, tsKvLatestEntity.getTs());
}
if (tsKvLatestEntity.getBooleanValue() != null) {
ps.setBoolean(4, tsKvLatestEntity.getBooleanValue());
ps.setBoolean(10, tsKvLatestEntity.getBooleanValue());
} else {
ps.setNull(4, Types.BOOLEAN);
ps.setNull(10, Types.BOOLEAN);
}
ps.setString(5, replaceNullChars(tsKvLatestEntity.getStrValue()));
ps.setString(11, replaceNullChars(tsKvLatestEntity.getStrValue()));
if (tsKvLatestEntity.getLongValue() != null) {
ps.setLong(6, tsKvLatestEntity.getLongValue());
ps.setLong(12, tsKvLatestEntity.getLongValue());
} else {
ps.setNull(6, Types.BIGINT);
ps.setNull(12, Types.BIGINT);
}
if (tsKvLatestEntity.getDoubleValue() != null) {
ps.setDouble(7, tsKvLatestEntity.getDoubleValue());
ps.setDouble(13, tsKvLatestEntity.getDoubleValue());
} else {
ps.setNull(7, Types.DOUBLE);
ps.setNull(13, Types.DOUBLE);
}
ps.setString(8, replaceNullChars(tsKvLatestEntity.getJsonValue()));
ps.setString(14, replaceNullChars(tsKvLatestEntity.getJsonValue()));
}
@Override
public int getBatchSize() {
return insertEntities.size();
}
});
} }
if (insertEntities.isEmpty()) {
return seqNumbers;
}
onInsertOrUpdate(insertEntities, keyHolder);
seqNumbersList = keyHolder.getKeyList();
for (int i = 0; i < seqNumbersList.size(); i++) {
seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(SEQ_NUMBER));
}
return seqNumbers;
}); });
} }
private int[] onBatchUpdate(List<TsKvLatestEntity> entities, KeyHolder keyHolder) {
return jdbcTemplate.batchUpdate(new SimplePreparedStatementCreator(batchUpdateQuery), new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
TsKvLatestEntity tsKvLatestEntity = entities.get(i);
ps.setLong(1, tsKvLatestEntity.getTs());
if (tsKvLatestEntity.getBooleanValue() != null) {
ps.setBoolean(2, tsKvLatestEntity.getBooleanValue());
} else {
ps.setNull(2, Types.BOOLEAN);
}
ps.setString(3, replaceNullChars(tsKvLatestEntity.getStrValue()));
if (tsKvLatestEntity.getLongValue() != null) {
ps.setLong(4, tsKvLatestEntity.getLongValue());
} else {
ps.setNull(4, Types.BIGINT);
}
if (tsKvLatestEntity.getDoubleValue() != null) {
ps.setDouble(5, tsKvLatestEntity.getDoubleValue());
} else {
ps.setNull(5, Types.DOUBLE);
}
ps.setString(6, replaceNullChars(tsKvLatestEntity.getJsonValue()));
ps.setObject(7, tsKvLatestEntity.getEntityId());
ps.setInt(8, tsKvLatestEntity.getKey());
if (updateByLatestTs) {
ps.setLong(9, tsKvLatestEntity.getTs());
}
}
@Override
public int getBatchSize() {
return entities.size();
}
}, keyHolder);
}
private void onInsertOrUpdate(List<TsKvLatestEntity> insertEntities, KeyHolder keyHolder) {
jdbcTemplate.batchUpdate(new SimplePreparedStatementCreator(insertOrUpdateQuery), new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
TsKvLatestEntity tsKvLatestEntity = insertEntities.get(i);
ps.setObject(1, tsKvLatestEntity.getEntityId());
ps.setInt(2, tsKvLatestEntity.getKey());
ps.setLong(3, tsKvLatestEntity.getTs());
ps.setLong(9, tsKvLatestEntity.getTs());
if (updateByLatestTs) {
ps.setLong(15, tsKvLatestEntity.getTs());
}
if (tsKvLatestEntity.getBooleanValue() != null) {
ps.setBoolean(4, tsKvLatestEntity.getBooleanValue());
ps.setBoolean(10, tsKvLatestEntity.getBooleanValue());
} else {
ps.setNull(4, Types.BOOLEAN);
ps.setNull(10, Types.BOOLEAN);
}
ps.setString(5, replaceNullChars(tsKvLatestEntity.getStrValue()));
ps.setString(11, replaceNullChars(tsKvLatestEntity.getStrValue()));
if (tsKvLatestEntity.getLongValue() != null) {
ps.setLong(6, tsKvLatestEntity.getLongValue());
ps.setLong(12, tsKvLatestEntity.getLongValue());
} else {
ps.setNull(6, Types.BIGINT);
ps.setNull(12, Types.BIGINT);
}
if (tsKvLatestEntity.getDoubleValue() != null) {
ps.setDouble(7, tsKvLatestEntity.getDoubleValue());
ps.setDouble(13, tsKvLatestEntity.getDoubleValue());
} else {
ps.setNull(7, Types.DOUBLE);
ps.setNull(13, Types.DOUBLE);
}
ps.setString(8, replaceNullChars(tsKvLatestEntity.getJsonValue()));
ps.setString(14, replaceNullChars(tsKvLatestEntity.getJsonValue()));
}
@Override
public int getBatchSize() {
return insertEntities.size();
}
}, keyHolder);
}
private static class SimplePreparedStatementCreator implements PreparedStatementCreator, SqlProvider {
private static final String[] COLUMNS = {SEQ_NUMBER};
private final String sql;
public SimplePreparedStatementCreator(String sql) {
this.sql = sql;
}
@Override
public PreparedStatement createPreparedStatement(Connection con) throws SQLException {
return con.prepareStatement(sql, COLUMNS);
}
@Override
public String getSql() {
return this.sql;
}
}
} }

View File

@ -538,6 +538,8 @@ CREATE TABLE IF NOT EXISTS entity_view (
CONSTRAINT entity_view_external_id_unq_key UNIQUE (tenant_id, external_id) CONSTRAINT entity_view_external_id_unq_key UNIQUE (tenant_id, external_id)
); );
CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_seq cache 1000;
CREATE TABLE IF NOT EXISTS ts_kv_latest CREATE TABLE IF NOT EXISTS ts_kv_latest
( (
entity_id uuid NOT NULL, entity_id uuid NOT NULL,
@ -548,6 +550,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest
long_v bigint, long_v bigint,
dbl_v double precision, dbl_v double precision,
json_v json, json_v json,
seq_number bigint,
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
); );

View File

@ -0,0 +1,151 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.service.timeseries.sql;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.service.AbstractServiceTest;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@DaoSqlTest
public class LatestTimeseriesPerformanceTest extends AbstractServiceTest {
private static final String STRING_KEY = "stringKey";
private static final String LONG_KEY = "longKey";
private static final String DOUBLE_KEY = "doubleKey";
private static final String BOOLEAN_KEY = "booleanKey";
public static final int AMOUNT_OF_UNIQ_KEY = 10000;
private final Random random = new Random();
@Autowired
private TimeseriesLatestDao timeseriesLatestDao;
private ListeningExecutorService testExecutor;
private EntityId entityId;
private AtomicLong saveCounter;
@Before
public void before() {
Tenant tenant = new Tenant();
tenant.setTitle("My tenant");
Tenant savedTenant = tenantService.saveTenant(tenant);
Assert.assertNotNull(savedTenant);
tenantId = savedTenant.getId();
entityId = new DeviceId(UUID.randomUUID());
saveCounter = new AtomicLong(0);
testExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(200, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope")));
}
@After
public void after() {
tenantService.deleteTenant(tenantId);
if (testExecutor != null) {
testExecutor.shutdownNow();
}
}
@Test
public void test_save_latest_timeseries() throws Exception {
warmup();
saveCounter.set(0);
long startTime = System.currentTimeMillis();
List<ListenableFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < 25_000; i++) {
futures.add(save(generateStrEntry(getRandomKey())));
futures.add(save(generateLngEntry(getRandomKey())));
futures.add(save(generateDblEntry(getRandomKey())));
futures.add(save(generateBoolEntry(getRandomKey())));
}
Futures.allAsList(futures).get(60, TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
System.out.println("Total time: " + totalTime);
System.out.println("Saved count: " + saveCounter.get());
System.out.println("Saved per 1 sec: " + saveCounter.get() * 1000 / totalTime);
}
private void warmup() throws Exception {
List<ListenableFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < AMOUNT_OF_UNIQ_KEY; i++) {
futures.add(save(generateStrEntry(i)));
futures.add(save(generateLngEntry(i)));
futures.add(save(generateDblEntry(i)));
futures.add(save(generateBoolEntry(i)));
}
Futures.allAsList(futures).get(60, TimeUnit.SECONDS);
}
private ListenableFuture<?> save(TsKvEntry tsKvEntry) {
return Futures.transformAsync(testExecutor.submit(() -> timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry)), result -> {
saveCounter.incrementAndGet();
return result;
}, testExecutor);
}
private TsKvEntry generateStrEntry(int keyIndex) {
return new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(STRING_KEY + keyIndex, RandomStringUtils.random(10)));
}
private TsKvEntry generateLngEntry(int keyIndex) {
return new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(LONG_KEY + keyIndex, random.nextLong()));
}
private TsKvEntry generateDblEntry(int keyIndex) {
return new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry(DOUBLE_KEY + keyIndex, random.nextDouble()));
}
private TsKvEntry generateBoolEntry(int keyIndex) {
return new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(BOOLEAN_KEY + keyIndex, random.nextBoolean()));
}
private int getRandomKey() {
return random.nextInt(AMOUNT_OF_UNIQ_KEY);
}
}

View File

@ -12,6 +12,7 @@
<!-- Log Hibernate SQL queries --> <!-- Log Hibernate SQL queries -->
<!-- <logger name="org.hibernate.SQL" level="DEBUG"/> --> <!-- <logger name="org.hibernate.SQL" level="DEBUG"/> -->
<!-- <logger name="org.springframework.jdbc.core.JdbcTemplate" level="DEBUG" /> -->
<root level="WARN"> <root level="WARN">
<appender-ref ref="console"/> <appender-ref ref="console"/>

View File

@ -37,6 +37,7 @@ DROP TABLE IF EXISTS tenant;
DROP TABLE IF EXISTS ts_kv; DROP TABLE IF EXISTS ts_kv;
DROP TABLE IF EXISTS ts_kv_latest; DROP TABLE IF EXISTS ts_kv_latest;
DROP TABLE IF EXISTS ts_kv_dictionary; DROP TABLE IF EXISTS ts_kv_dictionary;
DROP SEQUENCE IF EXISTS ts_kv_latest_seq;
DROP TABLE IF EXISTS user_credentials; DROP TABLE IF EXISTS user_credentials;
DROP TABLE IF EXISTS widgets_bundle_widget; DROP TABLE IF EXISTS widgets_bundle_widget;
DROP TABLE IF EXISTS widget_type; DROP TABLE IF EXISTS widget_type;