diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 16de3fd708..8be9a8a1c5 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -204,6 +204,18 @@ sql: batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}" batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}" stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}" + ts: + batch_size: "${SQL_TS_BATCH_SIZE:10000}" + batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}" + stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" + ts_latest: + batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}" + batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}" + stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}" + ts_timescale: + batch_size: "${SQL_TS_TIMESCALE_BATCH_SIZE:10000}" + batch_max_delay: "${SQL_TS_TIMESCALE_BATCH_MAX_DELAY_MS:100}" + stats_print_interval_ms: "${SQL_TS_TIMESCALE_BATCH_STATS_PRINT_MS:10000}" # Specify whether to remove null characters from strValue of attributes and timeseries before insert remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}" diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbsractTsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbstractTsKvEntity.java similarity index 97% rename from dao/src/main/java/org/thingsboard/server/dao/model/sql/AbsractTsKvEntity.java rename to dao/src/main/java/org/thingsboard/server/dao/model/sql/AbstractTsKvEntity.java index d8c0e4ef0a..4c8a2606d8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbsractTsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbstractTsKvEntity.java @@ -35,7 +35,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUM @Data @MappedSuperclass -public abstract class AbsractTsKvEntity { +public abstract class AbstractTsKvEntity { protected static final String SUM = "SUM"; protected static final String AVG = "AVG"; @@ -80,7 +80,7 @@ public abstract class AbsractTsKvEntity { protected static boolean isAllNull(Object... args) { for (Object arg : args) { - if(arg != null) { + if (arg != null) { return false; } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvEntity.java index 3427c928f4..753e2c10fa 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvEntity.java @@ -21,7 +21,7 @@ import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.ToData; -import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity; +import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; import javax.persistence.Column; import javax.persistence.ColumnResult; @@ -115,7 +115,7 @@ import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.F resultSetMapping = "timescaleCountMapping" ) }) -public final class TimescaleTsKvEntity extends AbsractTsKvEntity implements ToData { +public final class TimescaleTsKvEntity extends AbstractTsKvEntity implements ToData { @Id @Column(name = TENANT_ID_COLUMN) diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java index c5b9237f13..dab344cb44 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java @@ -20,7 +20,7 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.ToData; -import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity; +import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; import javax.persistence.Column; import javax.persistence.Entity; @@ -37,7 +37,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN; @Entity @Table(name = "ts_kv") @IdClass(TsKvCompositeKey.class) -public final class TsKvEntity extends AbsractTsKvEntity implements ToData { +public final class TsKvEntity extends AbstractTsKvEntity implements ToData { @Id @Enumerated(EnumType.STRING) diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvLatestEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvLatestEntity.java index 3c1f735834..fb558d7b87 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvLatestEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvLatestEntity.java @@ -20,7 +20,7 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.ToData; -import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity; +import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; import javax.persistence.Column; import javax.persistence.Entity; @@ -37,7 +37,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN; @Entity @Table(name = "ts_kv_latest") @IdClass(TsKvLatestCompositeKey.class) -public final class TsKvLatestEntity extends AbsractTsKvEntity implements ToData { +public final class TsKvLatestEntity extends AbstractTsKvEntity implements ToData { @Id @Enumerated(EnumType.STRING) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java index 7630ccdaad..6e894fb382 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java @@ -92,8 +92,8 @@ public class TbSqlBlockingQueue implements TbSqlQueue { }); logExecutor.scheduleAtFixedRate(() -> { - log.info("Attributes queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", - queue.size(), addedCount.getAndSet(0), savedCount.getAndSet(0), failedCount.getAndSet(0)); + log.info("[{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", + params.getLogName(), queue.size(), addedCount.getAndSet(0), savedCount.getAndSet(0), failedCount.getAndSet(0)); }, params.getStatsPrintIntervalMs(), params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractInsertRepository.java index 919ab5314d..a4cd67abdc 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractInsertRepository.java @@ -15,8 +15,11 @@ */ package org.thingsboard.server.dao.sqlts; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; +import org.springframework.transaction.support.TransactionTemplate; import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; @@ -57,6 +60,12 @@ public abstract class AbstractInsertRepository { @PersistenceContext protected EntityManager entityManager; + @Autowired + protected JdbcTemplate jdbcTemplate; + + @Autowired + protected TransactionTemplate transactionTemplate; + protected static String getInsertOrUpdateStringHsql(String tableName, String constraint, String value, String nullValues) { return "MERGE INTO " + tableName + " USING(VALUES :entity_type, :entity_id, :key, :ts, :" + value + ") A (entity_type, entity_id, key, ts, " + value + ") ON " + constraint + " WHEN MATCHED THEN UPDATE SET " + tableName + "." + value + " = A." + value + ", " + tableName + ".ts = A.ts," + nullValues + "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, key, ts, " + value + ") VALUES (A.entity_type, A.entity_id, A.key, A.ts, A." + value + ")"; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractLatestInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractLatestInsertRepository.java index a31b0e395b..e9b10eafa3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractLatestInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractLatestInsertRepository.java @@ -19,11 +19,15 @@ import org.springframework.data.jpa.repository.Modifying; import org.springframework.stereotype.Repository; import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity; +import java.util.List; + @Repository public abstract class AbstractLatestInsertRepository extends AbstractInsertRepository { public abstract void saveOrUpdate(TsKvLatestEntity entity); + public abstract void saveOrUpdate(List entities); + protected void processSaveOrUpdate(TsKvLatestEntity entity, String requestBoolValue, String requestStrValue, String requestLongValue, String requestDblValue) { if (entity.getBooleanValue() != null) { saveOrUpdateBoolean(entity, requestBoolValue); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractTimeseriesInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractTimeseriesInsertRepository.java index 6f1b9b1ed3..4787cd7a46 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractTimeseriesInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractTimeseriesInsertRepository.java @@ -17,13 +17,17 @@ package org.thingsboard.server.dao.sqlts; import org.springframework.data.jpa.repository.Modifying; import org.springframework.stereotype.Repository; -import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity; +import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; + +import java.util.List; @Repository -public abstract class AbstractTimeseriesInsertRepository extends AbstractInsertRepository { +public abstract class AbstractTimeseriesInsertRepository extends AbstractInsertRepository { public abstract void saveOrUpdate(T entity); + public abstract void saveOrUpdate(List entities); + protected void processSaveOrUpdate(T entity, String requestBoolValue, String requestStrValue, String requestLongValue, String requestDblValue) { if (entity.getBooleanValue() != null) { saveOrUpdateBoolean(entity, requestBoolValue); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleInsertRepository.java index 11f4ea4b5d..8493703275 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleInsertRepository.java @@ -15,13 +15,22 @@ */ package org.thingsboard.server.dao.sqlts.timescale; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.stereotype.Repository; +import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity; import org.thingsboard.server.dao.sqlts.AbstractTimeseriesInsertRepository; import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.util.TimescaleDBTsDao; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; + @TimescaleDBTsDao @PsqlDao @Repository @@ -30,14 +39,123 @@ public class TimescaleInsertRepository extends AbstractTimeseriesInsertRepositor private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, PSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS); private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V, PSQL_ON_STR_VALUE_UPDATE_SET_NULLS); - private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V , PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V, PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS); private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS); + private static final String BATCH_UPDATE = + "UPDATE tenant_ts_kv SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ? WHERE entity_type = ? AND entity_id = ? and key = ? and ts = ?"; + + + private static final String INSERT_OR_UPDATE = + "INSERT INTO tenant_ts_kv (tenant_id, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) VALUES(?, ?, ?, ?, ?, ?, ?, ?) " + + "ON CONFLICT (tenant_id, entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?;"; + @Override public void saveOrUpdate(TimescaleTsKvEntity entity) { processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT); } + @Override + public void saveOrUpdate(List entities) { + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(TransactionStatus status) { + int[] result = jdbcTemplate.batchUpdate(BATCH_UPDATE, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + + if (entities.get(i).getBooleanValue() != null) { + ps.setBoolean(1, entities.get(i).getBooleanValue()); + } else { + ps.setNull(1, Types.BOOLEAN); + } + + ps.setString(2, replaceNullChars(entities.get(i).getStrValue())); + + if (entities.get(i).getLongValue() != null) { + ps.setLong(3, entities.get(i).getLongValue()); + } else { + ps.setNull(3, Types.BIGINT); + } + + if (entities.get(i).getDoubleValue() != null) { + ps.setDouble(4, entities.get(i).getDoubleValue()); + } else { + ps.setNull(4, Types.DOUBLE); + } + + ps.setString(5, entities.get(i).getTenantId()); + ps.setString(6, entities.get(i).getEntityId()); + ps.setString(7, entities.get(i).getKey()); + ps.setLong(8, entities.get(i).getTs()); + } + + @Override + public int getBatchSize() { + return entities.size(); + } + }); + + int updatedCount = 0; + for (int i = 0; i < result.length; i++) { + if (result[i] == 0) { + updatedCount++; + } + } + + List insertEntities = new ArrayList<>(updatedCount); + for (int i = 0; i < result.length; i++) { + if (result[i] == 0) { + insertEntities.add(entities.get(i)); + } + } + + jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + ps.setString(1, entities.get(i).getTenantId()); + ps.setString(2, entities.get(i).getEntityId()); + ps.setString(3, entities.get(i).getKey()); + ps.setLong(4, entities.get(i).getTs()); + + if (entities.get(i).getBooleanValue() != null) { + ps.setBoolean(5, entities.get(i).getBooleanValue()); + ps.setBoolean(9, entities.get(i).getBooleanValue()); + } else { + ps.setNull(5, Types.BOOLEAN); + ps.setNull(9, Types.BOOLEAN); + } + + ps.setString(6, replaceNullChars(entities.get(i).getStrValue())); + ps.setString(10, replaceNullChars(entities.get(i).getStrValue())); + + + if (entities.get(i).getLongValue() != null) { + ps.setLong(7, entities.get(i).getLongValue()); + ps.setLong(11, entities.get(i).getLongValue()); + } else { + ps.setNull(7, Types.BIGINT); + ps.setNull(11, Types.BIGINT); + } + + if (entities.get(i).getDoubleValue() != null) { + ps.setDouble(8, entities.get(i).getDoubleValue()); + ps.setDouble(12, entities.get(i).getDoubleValue()); + } else { + ps.setNull(8, Types.DOUBLE); + ps.setNull(12, Types.DOUBLE); + } + } + + @Override + public int getBatchSize() { + return insertEntities.size(); + } + }); + } + }); + } + @Override protected void saveOrUpdateBoolean(TimescaleTsKvEntity entity, String query) { entityManager.createNativeQuery(query) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 844f22a31c..961f545567 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; import org.springframework.stereotype.Component; @@ -36,11 +37,16 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvQuery; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity; +import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; +import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao; import org.thingsboard.server.dao.sqlts.AbstractTimeseriesInsertRepository; import org.thingsboard.server.dao.timeseries.TimeseriesDao; import org.thingsboard.server.dao.util.TimescaleDBTsDao; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -66,6 +72,39 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements @Autowired private AbstractTimeseriesInsertRepository insertRepository; + @Autowired + ScheduledLogExecutorComponent logExecutor; + + @Value("${sql.ts_timescale.batch_size:1000}") + private int batchSize; + + @Value("${sql.ts_timescale.batch_max_delay:100}") + private long maxDelay; + + @Value("${sql.ts_timescale.stats_print_interval_ms:1000}") + private long statsPrintIntervalMs; + + private TbSqlBlockingQueue queue; + + @PostConstruct + private void init() { + TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder() + .logName("TS Timescale") + .batchSize(batchSize) + .maxDelay(maxDelay) + .statsPrintIntervalMs(statsPrintIntervalMs) + .build(); + queue = new TbSqlBlockingQueue<>(params); + queue.init(logExecutor, v -> insertRepository.saveOrUpdate(v)); + } + + @PreDestroy + private void destroy() { + if (queue != null) { + queue.destroy(); + } + } + @Override public ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, List queries) { return processFindAllAsync(tenantId, entityId, queries); @@ -126,11 +165,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements entity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null)); entity.setLongValue(tsKvEntry.getLongValue().orElse(null)); entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); - log.trace("Saving entity to timescale db: {}", entity); - return insertService.submit(() -> { - insertRepository.saveOrUpdate(entity); - return null; - }); + return queue.add(entity); } @Override @@ -209,7 +244,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements if (!CollectionUtils.isEmpty(timescaleTsKvEntities)) { List> result = new ArrayList<>(); timescaleTsKvEntities.forEach(entity -> { - if(entity != null && entity.isNotEmpty()) { + if (entity != null && entity.isNotEmpty()) { entity.setEntityId(entityIdStr); entity.setTenantId(tenantIdStr); entity.setKey(key); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlLatestInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlLatestInsertRepository.java index 84250406d8..07650396f2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlLatestInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlLatestInsertRepository.java @@ -22,6 +22,8 @@ import org.thingsboard.server.dao.sqlts.AbstractLatestInsertRepository; import org.thingsboard.server.dao.util.HsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; +import java.util.List; + @SqlTsDao @HsqlDao @Repository @@ -40,6 +42,11 @@ public class HsqlLatestInsertRepository extends AbstractLatestInsertRepository { processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT); } + @Override + public void saveOrUpdate(List entities) { + + } + @Override protected void saveOrUpdateBoolean(TsKvLatestEntity entity, String query) { entityManager.createNativeQuery(query) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlTimeseriesInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlTimeseriesInsertRepository.java index 927bcd2443..8dbefd4443 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlTimeseriesInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlTimeseriesInsertRepository.java @@ -22,6 +22,8 @@ import org.thingsboard.server.dao.sqlts.AbstractTimeseriesInsertRepository; import org.thingsboard.server.dao.util.HsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; +import java.util.List; + @SqlTsDao @HsqlDao @Repository @@ -40,6 +42,11 @@ public class HsqlTimeseriesInsertRepository extends AbstractTimeseriesInsertRepo processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT); } + @Override + public void saveOrUpdate(List entities) { + + } + @Override protected void saveOrUpdateBoolean(TsKvEntity entity, String query) { entityManager.createNativeQuery(query) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/JpaTimeseriesDao.java index b70b59604f..7c198d9a73 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/JpaTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/JpaTimeseriesDao.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; import org.springframework.stereotype.Component; @@ -38,6 +39,9 @@ import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestCompositeKey; import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity; +import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; +import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sqlts.AbstractLatestInsertRepository; import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao; import org.thingsboard.server.dao.sqlts.AbstractTimeseriesInsertRepository; @@ -46,6 +50,8 @@ import org.thingsboard.server.dao.timeseries.TimeseriesDao; import org.thingsboard.server.dao.util.SqlTsDao; import javax.annotation.Nullable; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -73,6 +79,63 @@ public class JpaTimeseriesDao extends AbstractSqlTimeseriesDao implements Timese @Autowired private AbstractLatestInsertRepository insertLatestRepository; + @Autowired + ScheduledLogExecutorComponent logExecutor; + + @Value("${sql.ts.batch_size:1000}") + private int tsBatchSize; + + @Value("${sql.ts.batch_max_delay:100}") + private long tsMaxDelay; + + @Value("${sql.ts.stats_print_interval_ms:1000}") + private long tsStatsPrintIntervalMs; + + @Value("${sql.ts_latest.batch_size:1000}") + private int tsLatestBatchSize; + + @Value("${sql.ts_latest.batch_max_delay:100}") + private long tsLatestMaxDelay; + + @Value("${sql.ts_latest.stats_print_interval_ms:1000}") + private long tsLatestStatsPrintIntervalMs; + + private TbSqlBlockingQueue tsQueue; + private TbSqlBlockingQueue tsLatestQueue; + + + @PostConstruct + private void init() { + TbSqlBlockingQueueParams tsParams = TbSqlBlockingQueueParams.builder() + .logName("TS") + .batchSize(tsBatchSize) + .maxDelay(tsMaxDelay) + .statsPrintIntervalMs(tsStatsPrintIntervalMs) + .build(); + tsQueue = new TbSqlBlockingQueue<>(tsParams); + tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v)); + + TbSqlBlockingQueueParams tsLatestParams = TbSqlBlockingQueueParams.builder() + .logName("TS Latest") + .batchSize(tsLatestBatchSize) + .maxDelay(tsLatestMaxDelay) + .statsPrintIntervalMs(tsLatestStatsPrintIntervalMs) + .build(); + tsLatestQueue = new TbSqlBlockingQueue<>(tsLatestParams); + tsLatestQueue.init(logExecutor, v -> insertLatestRepository.saveOrUpdate(v)); + } + + @PreDestroy + private void destroy() { + if (tsQueue != null) { + tsQueue.destroy(); + } + + if (tsLatestQueue != null) { + tsLatestQueue.destroy(); + } + } + @Override public ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, List queries) { return processFindAllAsync(tenantId, entityId, queries); @@ -266,10 +329,7 @@ public class JpaTimeseriesDao extends AbstractSqlTimeseriesDao implements Timese entity.setLongValue(tsKvEntry.getLongValue().orElse(null)); entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); log.trace("Saving entity: {}", entity); - return insertService.submit(() -> { - insertRepository.saveOrUpdate(entity); - return null; - }); + return tsQueue.add(entity); } @Override @@ -288,10 +348,7 @@ public class JpaTimeseriesDao extends AbstractSqlTimeseriesDao implements Timese latestEntity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null)); latestEntity.setLongValue(tsKvEntry.getLongValue().orElse(null)); latestEntity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); - return insertService.submit(() -> { - insertLatestRepository.saveOrUpdate(latestEntity); - return null; - }); + return tsLatestQueue.add(latestEntity); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlLatestInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlLatestInsertRepository.java index 5d50bf0dd9..92252e9d18 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlLatestInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlLatestInsertRepository.java @@ -15,13 +15,22 @@ */ package org.thingsboard.server.dao.sqlts.ts; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.stereotype.Repository; +import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity; import org.thingsboard.server.dao.sqlts.AbstractLatestInsertRepository; import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; + @SqlTsDao @PsqlDao @Repository @@ -35,11 +44,121 @@ public class PsqlLatestInsertRepository extends AbstractLatestInsertRepository { private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, LONG_V, PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS); private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, DBL_V, PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS); + private static final String BATCH_UPDATE = + "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ? WHERE entity_type = ? AND entity_id = ? and key = ?"; + + + private static final String INSERT_OR_UPDATE = + "INSERT INTO ts_kv_latest (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) VALUES(?, ?, ?, ?, ?, ?, ?, ?) " + + "ON CONFLICT (entity_type, entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?;"; + @Override public void saveOrUpdate(TsKvLatestEntity entity) { processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT); } + @Override + public void saveOrUpdate(List entities) { + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(TransactionStatus status) { + int[] result = jdbcTemplate.batchUpdate(BATCH_UPDATE, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + ps.setLong(1, entities.get(i).getTs()); + + if (entities.get(i).getBooleanValue() != null) { + ps.setBoolean(2, entities.get(i).getBooleanValue()); + } else { + ps.setNull(2, Types.BOOLEAN); + } + + ps.setString(3, replaceNullChars(entities.get(i).getStrValue())); + + if (entities.get(i).getLongValue() != null) { + ps.setLong(4, entities.get(i).getLongValue()); + } else { + ps.setNull(4, Types.BIGINT); + } + + if (entities.get(i).getDoubleValue() != null) { + ps.setDouble(5, entities.get(i).getDoubleValue()); + } else { + ps.setNull(5, Types.DOUBLE); + } + + ps.setString(6, entities.get(i).getEntityType().name()); + ps.setString(7, entities.get(i).getEntityId()); + ps.setString(8, entities.get(i).getKey()); + } + + @Override + public int getBatchSize() { + return entities.size(); + } + }); + + int updatedCount = 0; + for (int i = 0; i < result.length; i++) { + if (result[i] == 0) { + updatedCount++; + } + } + + List insertEntities = new ArrayList<>(updatedCount); + for (int i = 0; i < result.length; i++) { + if (result[i] == 0) { + insertEntities.add(entities.get(i)); + } + } + + jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + ps.setString(1, insertEntities.get(i).getEntityType().name()); + ps.setString(2, insertEntities.get(i).getEntityId()); + ps.setString(3, insertEntities.get(i).getKey()); + ps.setLong(4, insertEntities.get(i).getTs()); + ps.setLong(9, insertEntities.get(i).getTs()); + + if (insertEntities.get(i).getBooleanValue() != null) { + ps.setBoolean(5, insertEntities.get(i).getBooleanValue()); + ps.setBoolean(10, insertEntities.get(i).getBooleanValue()); + } else { + ps.setNull(5, Types.BOOLEAN); + ps.setNull(10, Types.BOOLEAN); + } + + ps.setString(6, replaceNullChars(entities.get(i).getStrValue())); + ps.setString(11, replaceNullChars(entities.get(i).getStrValue())); + + + if (insertEntities.get(i).getLongValue() != null) { + ps.setLong(7, insertEntities.get(i).getLongValue()); + ps.setLong(12, insertEntities.get(i).getLongValue()); + } else { + ps.setNull(7, Types.BIGINT); + ps.setNull(12, Types.BIGINT); + } + + if (insertEntities.get(i).getDoubleValue() != null) { + ps.setDouble(8, insertEntities.get(i).getDoubleValue()); + ps.setDouble(13, insertEntities.get(i).getDoubleValue()); + } else { + ps.setNull(8, Types.DOUBLE); + ps.setNull(13, Types.DOUBLE); + } + } + + @Override + public int getBatchSize() { + return insertEntities.size(); + } + }); + } + }); + } + @Override protected void saveOrUpdateBoolean(TsKvLatestEntity entity, String query) { entityManager.createNativeQuery(query) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlTimeseriesInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlTimeseriesInsertRepository.java index 0baea27d7b..edc37822b1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlTimeseriesInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlTimeseriesInsertRepository.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.sqlts.ts; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; @@ -22,6 +23,11 @@ import org.thingsboard.server.dao.sqlts.AbstractTimeseriesInsertRepository; import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; + @SqlTsDao @PsqlDao @Repository @@ -35,6 +41,10 @@ public class PsqlTimeseriesInsertRepository extends AbstractTimeseriesInsertRepo private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_TABLE, TS_KV_CONSTRAINT, LONG_V, PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS); private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_TABLE, TS_KV_CONSTRAINT, DBL_V, PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE = + "INSERT INTO ts_kv (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) VALUES(?, ?, ?, ?, ?, ?, ?, ?) " + + "ON CONFLICT (entity_type, entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?;"; + @Override public void saveOrUpdate(TsKvEntity entity) { processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT); @@ -83,4 +93,50 @@ public class PsqlTimeseriesInsertRepository extends AbstractTimeseriesInsertRepo .setParameter("dbl_v", entity.getDoubleValue()) .executeUpdate(); } + + @Override + public void saveOrUpdate(List entities) { + jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + ps.setString(1, entities.get(i).getEntityType().name()); + ps.setString(2, entities.get(i).getEntityId()); + ps.setString(3, entities.get(i).getKey()); + ps.setLong(4, entities.get(i).getTs()); + + if (entities.get(i).getBooleanValue() != null) { + ps.setBoolean(5, entities.get(i).getBooleanValue()); + ps.setBoolean(9, entities.get(i).getBooleanValue()); + } else { + ps.setNull(5, Types.BOOLEAN); + ps.setNull(9, Types.BOOLEAN); + } + + ps.setString(6, replaceNullChars(entities.get(i).getStrValue())); + ps.setString(10, replaceNullChars(entities.get(i).getStrValue())); + + + if (entities.get(i).getLongValue() != null) { + ps.setLong(7, entities.get(i).getLongValue()); + ps.setLong(11, entities.get(i).getLongValue()); + } else { + ps.setNull(7, Types.BIGINT); + ps.setNull(11, Types.BIGINT); + } + + if (entities.get(i).getDoubleValue() != null) { + ps.setDouble(8, entities.get(i).getDoubleValue()); + ps.setDouble(12, entities.get(i).getDoubleValue()); + } else { + ps.setNull(8, Types.DOUBLE); + ps.setNull(12, Types.DOUBLE); + } + } + + @Override + public int getBatchSize() { + return entities.size(); + } + }); + } } \ No newline at end of file