added support batch telemetry

This commit is contained in:
YevhenBondarenko 2019-12-05 15:21:41 +02:00
parent 0b6a68ad30
commit 00ff95bccf
16 changed files with 455 additions and 27 deletions

View File

@ -204,6 +204,18 @@ sql:
batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}"
batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}"
stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}"
ts:
batch_size: "${SQL_TS_BATCH_SIZE:10000}"
batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}"
stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}"
ts_latest:
batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}"
batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}"
stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}"
ts_timescale:
batch_size: "${SQL_TS_TIMESCALE_BATCH_SIZE:10000}"
batch_max_delay: "${SQL_TS_TIMESCALE_BATCH_MAX_DELAY_MS:100}"
stats_print_interval_ms: "${SQL_TS_TIMESCALE_BATCH_STATS_PRINT_MS:10000}"
# Specify whether to remove null characters from strValue of attributes and timeseries before insert
remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}"

View File

@ -35,7 +35,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUM
@Data
@MappedSuperclass
public abstract class AbsractTsKvEntity {
public abstract class AbstractTsKvEntity {
protected static final String SUM = "SUM";
protected static final String AVG = "AVG";
@ -80,7 +80,7 @@ public abstract class AbsractTsKvEntity {
protected static boolean isAllNull(Object... args) {
for (Object arg : args) {
if(arg != null) {
if (arg != null) {
return false;
}
}

View File

@ -21,7 +21,7 @@ import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ToData;
import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
import javax.persistence.Column;
import javax.persistence.ColumnResult;
@ -115,7 +115,7 @@ import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.F
resultSetMapping = "timescaleCountMapping"
)
})
public final class TimescaleTsKvEntity extends AbsractTsKvEntity implements ToData<TsKvEntry> {
public final class TimescaleTsKvEntity extends AbstractTsKvEntity implements ToData<TsKvEntry> {
@Id
@Column(name = TENANT_ID_COLUMN)

View File

@ -20,7 +20,7 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ToData;
import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
import javax.persistence.Column;
import javax.persistence.Entity;
@ -37,7 +37,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
@Entity
@Table(name = "ts_kv")
@IdClass(TsKvCompositeKey.class)
public final class TsKvEntity extends AbsractTsKvEntity implements ToData<TsKvEntry> {
public final class TsKvEntity extends AbstractTsKvEntity implements ToData<TsKvEntry> {
@Id
@Enumerated(EnumType.STRING)

View File

@ -20,7 +20,7 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ToData;
import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
import javax.persistence.Column;
import javax.persistence.Entity;
@ -37,7 +37,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
@Entity
@Table(name = "ts_kv_latest")
@IdClass(TsKvLatestCompositeKey.class)
public final class TsKvLatestEntity extends AbsractTsKvEntity implements ToData<TsKvEntry> {
public final class TsKvLatestEntity extends AbstractTsKvEntity implements ToData<TsKvEntry> {
@Id
@Enumerated(EnumType.STRING)

View File

@ -92,8 +92,8 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
});
logExecutor.scheduleAtFixedRate(() -> {
log.info("Attributes queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]",
queue.size(), addedCount.getAndSet(0), savedCount.getAndSet(0), failedCount.getAndSet(0));
log.info("[{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]",
params.getLogName(), queue.size(), addedCount.getAndSet(0), savedCount.getAndSet(0), failedCount.getAndSet(0));
}, params.getStatsPrintIntervalMs(), params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS);
}

View File

@ -15,8 +15,11 @@
*/
package org.thingsboard.server.dao.sqlts;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.support.TransactionTemplate;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
@ -57,6 +60,12 @@ public abstract class AbstractInsertRepository {
@PersistenceContext
protected EntityManager entityManager;
@Autowired
protected JdbcTemplate jdbcTemplate;
@Autowired
protected TransactionTemplate transactionTemplate;
protected static String getInsertOrUpdateStringHsql(String tableName, String constraint, String value, String nullValues) {
return "MERGE INTO " + tableName + " USING(VALUES :entity_type, :entity_id, :key, :ts, :" + value + ") A (entity_type, entity_id, key, ts, " + value + ") ON " + constraint + " WHEN MATCHED THEN UPDATE SET " + tableName + "." + value + " = A." + value + ", " + tableName + ".ts = A.ts," + nullValues + "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, key, ts, " + value + ") VALUES (A.entity_type, A.entity_id, A.key, A.ts, A." + value + ")";
}

View File

@ -19,11 +19,15 @@ import org.springframework.data.jpa.repository.Modifying;
import org.springframework.stereotype.Repository;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity;
import java.util.List;
@Repository
public abstract class AbstractLatestInsertRepository extends AbstractInsertRepository {
public abstract void saveOrUpdate(TsKvLatestEntity entity);
public abstract void saveOrUpdate(List<TsKvLatestEntity> entities);
protected void processSaveOrUpdate(TsKvLatestEntity entity, String requestBoolValue, String requestStrValue, String requestLongValue, String requestDblValue) {
if (entity.getBooleanValue() != null) {
saveOrUpdateBoolean(entity, requestBoolValue);

View File

@ -17,13 +17,17 @@ package org.thingsboard.server.dao.sqlts;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.stereotype.Repository;
import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
import java.util.List;
@Repository
public abstract class AbstractTimeseriesInsertRepository<T extends AbsractTsKvEntity> extends AbstractInsertRepository {
public abstract class AbstractTimeseriesInsertRepository<T extends AbstractTsKvEntity> extends AbstractInsertRepository {
public abstract void saveOrUpdate(T entity);
public abstract void saveOrUpdate(List<T> entities);
protected void processSaveOrUpdate(T entity, String requestBoolValue, String requestStrValue, String requestLongValue, String requestDblValue) {
if (entity.getBooleanValue() != null) {
saveOrUpdateBoolean(entity, requestBoolValue);

View File

@ -15,13 +15,22 @@
*/
package org.thingsboard.server.dao.sqlts.timescale;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity;
import org.thingsboard.server.dao.sqlts.AbstractTimeseriesInsertRepository;
import org.thingsboard.server.dao.util.PsqlDao;
import org.thingsboard.server.dao.util.TimescaleDBTsDao;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
@TimescaleDBTsDao
@PsqlDao
@Repository
@ -30,14 +39,123 @@ public class TimescaleInsertRepository extends AbstractTimeseriesInsertRepositor
private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, PSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V, PSQL_ON_STR_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V , PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V, PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS);
private static final String BATCH_UPDATE =
"UPDATE tenant_ts_kv SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ? WHERE entity_type = ? AND entity_id = ? and key = ? and ts = ?";
private static final String INSERT_OR_UPDATE =
"INSERT INTO tenant_ts_kv (tenant_id, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) VALUES(?, ?, ?, ?, ?, ?, ?, ?) " +
"ON CONFLICT (tenant_id, entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?;";
@Override
public void saveOrUpdate(TimescaleTsKvEntity entity) {
processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT);
}
@Override
public void saveOrUpdate(List<TimescaleTsKvEntity> entities) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
int[] result = jdbcTemplate.batchUpdate(BATCH_UPDATE, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
if (entities.get(i).getBooleanValue() != null) {
ps.setBoolean(1, entities.get(i).getBooleanValue());
} else {
ps.setNull(1, Types.BOOLEAN);
}
ps.setString(2, replaceNullChars(entities.get(i).getStrValue()));
if (entities.get(i).getLongValue() != null) {
ps.setLong(3, entities.get(i).getLongValue());
} else {
ps.setNull(3, Types.BIGINT);
}
if (entities.get(i).getDoubleValue() != null) {
ps.setDouble(4, entities.get(i).getDoubleValue());
} else {
ps.setNull(4, Types.DOUBLE);
}
ps.setString(5, entities.get(i).getTenantId());
ps.setString(6, entities.get(i).getEntityId());
ps.setString(7, entities.get(i).getKey());
ps.setLong(8, entities.get(i).getTs());
}
@Override
public int getBatchSize() {
return entities.size();
}
});
int updatedCount = 0;
for (int i = 0; i < result.length; i++) {
if (result[i] == 0) {
updatedCount++;
}
}
List<TimescaleTsKvEntity> insertEntities = new ArrayList<>(updatedCount);
for (int i = 0; i < result.length; i++) {
if (result[i] == 0) {
insertEntities.add(entities.get(i));
}
}
jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
ps.setString(1, entities.get(i).getTenantId());
ps.setString(2, entities.get(i).getEntityId());
ps.setString(3, entities.get(i).getKey());
ps.setLong(4, entities.get(i).getTs());
if (entities.get(i).getBooleanValue() != null) {
ps.setBoolean(5, entities.get(i).getBooleanValue());
ps.setBoolean(9, entities.get(i).getBooleanValue());
} else {
ps.setNull(5, Types.BOOLEAN);
ps.setNull(9, Types.BOOLEAN);
}
ps.setString(6, replaceNullChars(entities.get(i).getStrValue()));
ps.setString(10, replaceNullChars(entities.get(i).getStrValue()));
if (entities.get(i).getLongValue() != null) {
ps.setLong(7, entities.get(i).getLongValue());
ps.setLong(11, entities.get(i).getLongValue());
} else {
ps.setNull(7, Types.BIGINT);
ps.setNull(11, Types.BIGINT);
}
if (entities.get(i).getDoubleValue() != null) {
ps.setDouble(8, entities.get(i).getDoubleValue());
ps.setDouble(12, entities.get(i).getDoubleValue());
} else {
ps.setNull(8, Types.DOUBLE);
ps.setNull(12, Types.DOUBLE);
}
}
@Override
public int getBatchSize() {
return insertEntities.size();
}
});
}
});
}
@Override
protected void saveOrUpdateBoolean(TimescaleTsKvEntity entity, String query) {
entityManager.createNativeQuery(query)

View File

@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
@ -36,11 +37,16 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity;
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
import org.thingsboard.server.dao.sqlts.AbstractTimeseriesInsertRepository;
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
import org.thingsboard.server.dao.util.TimescaleDBTsDao;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -66,6 +72,39 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
@Autowired
private AbstractTimeseriesInsertRepository insertRepository;
@Autowired
ScheduledLogExecutorComponent logExecutor;
@Value("${sql.ts_timescale.batch_size:1000}")
private int batchSize;
@Value("${sql.ts_timescale.batch_max_delay:100}")
private long maxDelay;
@Value("${sql.ts_timescale.stats_print_interval_ms:1000}")
private long statsPrintIntervalMs;
private TbSqlBlockingQueue<TimescaleTsKvEntity> queue;
@PostConstruct
private void init() {
TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder()
.logName("TS Timescale")
.batchSize(batchSize)
.maxDelay(maxDelay)
.statsPrintIntervalMs(statsPrintIntervalMs)
.build();
queue = new TbSqlBlockingQueue<>(params);
queue.init(logExecutor, v -> insertRepository.saveOrUpdate(v));
}
@PreDestroy
private void destroy() {
if (queue != null) {
queue.destroy();
}
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
return processFindAllAsync(tenantId, entityId, queries);
@ -126,11 +165,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
entity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null));
entity.setLongValue(tsKvEntry.getLongValue().orElse(null));
entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
log.trace("Saving entity to timescale db: {}", entity);
return insertService.submit(() -> {
insertRepository.saveOrUpdate(entity);
return null;
});
return queue.add(entity);
}
@Override
@ -209,7 +244,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
if (!CollectionUtils.isEmpty(timescaleTsKvEntities)) {
List<Optional<TsKvEntry>> result = new ArrayList<>();
timescaleTsKvEntities.forEach(entity -> {
if(entity != null && entity.isNotEmpty()) {
if (entity != null && entity.isNotEmpty()) {
entity.setEntityId(entityIdStr);
entity.setTenantId(tenantIdStr);
entity.setKey(key);

View File

@ -22,6 +22,8 @@ import org.thingsboard.server.dao.sqlts.AbstractLatestInsertRepository;
import org.thingsboard.server.dao.util.HsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
import java.util.List;
@SqlTsDao
@HsqlDao
@Repository
@ -40,6 +42,11 @@ public class HsqlLatestInsertRepository extends AbstractLatestInsertRepository {
processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT);
}
@Override
public void saveOrUpdate(List<TsKvLatestEntity> entities) {
}
@Override
protected void saveOrUpdateBoolean(TsKvLatestEntity entity, String query) {
entityManager.createNativeQuery(query)

View File

@ -22,6 +22,8 @@ import org.thingsboard.server.dao.sqlts.AbstractTimeseriesInsertRepository;
import org.thingsboard.server.dao.util.HsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
import java.util.List;
@SqlTsDao
@HsqlDao
@Repository
@ -40,6 +42,11 @@ public class HsqlTimeseriesInsertRepository extends AbstractTimeseriesInsertRepo
processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT);
}
@Override
public void saveOrUpdate(List<TsKvEntity> entities) {
}
@Override
protected void saveOrUpdateBoolean(TsKvEntity entity, String query) {
entityManager.createNativeQuery(query)

View File

@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
@ -38,6 +39,9 @@ import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestCompositeKey;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity;
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sqlts.AbstractLatestInsertRepository;
import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
import org.thingsboard.server.dao.sqlts.AbstractTimeseriesInsertRepository;
@ -46,6 +50,8 @@ import org.thingsboard.server.dao.timeseries.TimeseriesDao;
import org.thingsboard.server.dao.util.SqlTsDao;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@ -73,6 +79,63 @@ public class JpaTimeseriesDao extends AbstractSqlTimeseriesDao implements Timese
@Autowired
private AbstractLatestInsertRepository insertLatestRepository;
@Autowired
ScheduledLogExecutorComponent logExecutor;
@Value("${sql.ts.batch_size:1000}")
private int tsBatchSize;
@Value("${sql.ts.batch_max_delay:100}")
private long tsMaxDelay;
@Value("${sql.ts.stats_print_interval_ms:1000}")
private long tsStatsPrintIntervalMs;
@Value("${sql.ts_latest.batch_size:1000}")
private int tsLatestBatchSize;
@Value("${sql.ts_latest.batch_max_delay:100}")
private long tsLatestMaxDelay;
@Value("${sql.ts_latest.stats_print_interval_ms:1000}")
private long tsLatestStatsPrintIntervalMs;
private TbSqlBlockingQueue<TsKvEntity> tsQueue;
private TbSqlBlockingQueue<TsKvLatestEntity> tsLatestQueue;
@PostConstruct
private void init() {
TbSqlBlockingQueueParams tsParams = TbSqlBlockingQueueParams.builder()
.logName("TS")
.batchSize(tsBatchSize)
.maxDelay(tsMaxDelay)
.statsPrintIntervalMs(tsStatsPrintIntervalMs)
.build();
tsQueue = new TbSqlBlockingQueue<>(tsParams);
tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v));
TbSqlBlockingQueueParams tsLatestParams = TbSqlBlockingQueueParams.builder()
.logName("TS Latest")
.batchSize(tsLatestBatchSize)
.maxDelay(tsLatestMaxDelay)
.statsPrintIntervalMs(tsLatestStatsPrintIntervalMs)
.build();
tsLatestQueue = new TbSqlBlockingQueue<>(tsLatestParams);
tsLatestQueue.init(logExecutor, v -> insertLatestRepository.saveOrUpdate(v));
}
@PreDestroy
private void destroy() {
if (tsQueue != null) {
tsQueue.destroy();
}
if (tsLatestQueue != null) {
tsLatestQueue.destroy();
}
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
return processFindAllAsync(tenantId, entityId, queries);
@ -266,10 +329,7 @@ public class JpaTimeseriesDao extends AbstractSqlTimeseriesDao implements Timese
entity.setLongValue(tsKvEntry.getLongValue().orElse(null));
entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
log.trace("Saving entity: {}", entity);
return insertService.submit(() -> {
insertRepository.saveOrUpdate(entity);
return null;
});
return tsQueue.add(entity);
}
@Override
@ -288,10 +348,7 @@ public class JpaTimeseriesDao extends AbstractSqlTimeseriesDao implements Timese
latestEntity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null));
latestEntity.setLongValue(tsKvEntry.getLongValue().orElse(null));
latestEntity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
return insertService.submit(() -> {
insertLatestRepository.saveOrUpdate(latestEntity);
return null;
});
return tsLatestQueue.add(latestEntity);
}
@Override

View File

@ -15,13 +15,22 @@
*/
package org.thingsboard.server.dao.sqlts.ts;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity;
import org.thingsboard.server.dao.sqlts.AbstractLatestInsertRepository;
import org.thingsboard.server.dao.util.PsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
@SqlTsDao
@PsqlDao
@Repository
@ -35,11 +44,121 @@ public class PsqlLatestInsertRepository extends AbstractLatestInsertRepository {
private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, LONG_V, PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, DBL_V, PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS);
private static final String BATCH_UPDATE =
"UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ? WHERE entity_type = ? AND entity_id = ? and key = ?";
private static final String INSERT_OR_UPDATE =
"INSERT INTO ts_kv_latest (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) VALUES(?, ?, ?, ?, ?, ?, ?, ?) " +
"ON CONFLICT (entity_type, entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?;";
@Override
public void saveOrUpdate(TsKvLatestEntity entity) {
processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT);
}
@Override
public void saveOrUpdate(List<TsKvLatestEntity> entities) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
int[] result = jdbcTemplate.batchUpdate(BATCH_UPDATE, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
ps.setLong(1, entities.get(i).getTs());
if (entities.get(i).getBooleanValue() != null) {
ps.setBoolean(2, entities.get(i).getBooleanValue());
} else {
ps.setNull(2, Types.BOOLEAN);
}
ps.setString(3, replaceNullChars(entities.get(i).getStrValue()));
if (entities.get(i).getLongValue() != null) {
ps.setLong(4, entities.get(i).getLongValue());
} else {
ps.setNull(4, Types.BIGINT);
}
if (entities.get(i).getDoubleValue() != null) {
ps.setDouble(5, entities.get(i).getDoubleValue());
} else {
ps.setNull(5, Types.DOUBLE);
}
ps.setString(6, entities.get(i).getEntityType().name());
ps.setString(7, entities.get(i).getEntityId());
ps.setString(8, entities.get(i).getKey());
}
@Override
public int getBatchSize() {
return entities.size();
}
});
int updatedCount = 0;
for (int i = 0; i < result.length; i++) {
if (result[i] == 0) {
updatedCount++;
}
}
List<TsKvLatestEntity> insertEntities = new ArrayList<>(updatedCount);
for (int i = 0; i < result.length; i++) {
if (result[i] == 0) {
insertEntities.add(entities.get(i));
}
}
jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
ps.setString(1, insertEntities.get(i).getEntityType().name());
ps.setString(2, insertEntities.get(i).getEntityId());
ps.setString(3, insertEntities.get(i).getKey());
ps.setLong(4, insertEntities.get(i).getTs());
ps.setLong(9, insertEntities.get(i).getTs());
if (insertEntities.get(i).getBooleanValue() != null) {
ps.setBoolean(5, insertEntities.get(i).getBooleanValue());
ps.setBoolean(10, insertEntities.get(i).getBooleanValue());
} else {
ps.setNull(5, Types.BOOLEAN);
ps.setNull(10, Types.BOOLEAN);
}
ps.setString(6, replaceNullChars(entities.get(i).getStrValue()));
ps.setString(11, replaceNullChars(entities.get(i).getStrValue()));
if (insertEntities.get(i).getLongValue() != null) {
ps.setLong(7, insertEntities.get(i).getLongValue());
ps.setLong(12, insertEntities.get(i).getLongValue());
} else {
ps.setNull(7, Types.BIGINT);
ps.setNull(12, Types.BIGINT);
}
if (insertEntities.get(i).getDoubleValue() != null) {
ps.setDouble(8, insertEntities.get(i).getDoubleValue());
ps.setDouble(13, insertEntities.get(i).getDoubleValue());
} else {
ps.setNull(8, Types.DOUBLE);
ps.setNull(13, Types.DOUBLE);
}
}
@Override
public int getBatchSize() {
return insertEntities.size();
}
});
}
});
}
@Override
protected void saveOrUpdateBoolean(TsKvLatestEntity entity, String query) {
entityManager.createNativeQuery(query)

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.dao.sqlts.ts;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
@ -22,6 +23,11 @@ import org.thingsboard.server.dao.sqlts.AbstractTimeseriesInsertRepository;
import org.thingsboard.server.dao.util.PsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
@SqlTsDao
@PsqlDao
@Repository
@ -35,6 +41,10 @@ public class PsqlTimeseriesInsertRepository extends AbstractTimeseriesInsertRepo
private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_TABLE, TS_KV_CONSTRAINT, LONG_V, PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_TABLE, TS_KV_CONSTRAINT, DBL_V, PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE =
"INSERT INTO ts_kv (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) VALUES(?, ?, ?, ?, ?, ?, ?, ?) " +
"ON CONFLICT (entity_type, entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?;";
@Override
public void saveOrUpdate(TsKvEntity entity) {
processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT);
@ -83,4 +93,50 @@ public class PsqlTimeseriesInsertRepository extends AbstractTimeseriesInsertRepo
.setParameter("dbl_v", entity.getDoubleValue())
.executeUpdate();
}
@Override
public void saveOrUpdate(List<TsKvEntity> entities) {
jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
ps.setString(1, entities.get(i).getEntityType().name());
ps.setString(2, entities.get(i).getEntityId());
ps.setString(3, entities.get(i).getKey());
ps.setLong(4, entities.get(i).getTs());
if (entities.get(i).getBooleanValue() != null) {
ps.setBoolean(5, entities.get(i).getBooleanValue());
ps.setBoolean(9, entities.get(i).getBooleanValue());
} else {
ps.setNull(5, Types.BOOLEAN);
ps.setNull(9, Types.BOOLEAN);
}
ps.setString(6, replaceNullChars(entities.get(i).getStrValue()));
ps.setString(10, replaceNullChars(entities.get(i).getStrValue()));
if (entities.get(i).getLongValue() != null) {
ps.setLong(7, entities.get(i).getLongValue());
ps.setLong(11, entities.get(i).getLongValue());
} else {
ps.setNull(7, Types.BIGINT);
ps.setNull(11, Types.BIGINT);
}
if (entities.get(i).getDoubleValue() != null) {
ps.setDouble(8, entities.get(i).getDoubleValue());
ps.setDouble(12, entities.get(i).getDoubleValue());
} else {
ps.setNull(8, Types.DOUBLE);
ps.setNull(12, Types.DOUBLE);
}
}
@Override
public int getBatchSize() {
return entities.size();
}
});
}
}