Prepared Statement initialization lock

This commit is contained in:
Andrii Shvaika 2021-04-14 13:49:09 +03:00
parent ae5632f8ab
commit 123457f8eb

View File

@ -62,6 +62,8 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
@ -107,6 +109,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
private PreparedStatement[] fetchStmtsDesc; private PreparedStatement[] fetchStmtsDesc;
private PreparedStatement deleteStmt; private PreparedStatement deleteStmt;
private PreparedStatement deletePartitionStmt; private PreparedStatement deletePartitionStmt;
private final Lock stmtCreationLock = new ReentrantLock();
private boolean isInstall() { private boolean isInstall() {
return environment.acceptsProfiles(Profiles.of("install")); return environment.acceptsProfiles(Profiles.of("install"));
@ -545,13 +548,20 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
private PreparedStatement getDeleteStmt() { private PreparedStatement getDeleteStmt() {
if (deleteStmt == null) { if (deleteStmt == null) {
deleteStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_CF + stmtCreationLock.lock();
" WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM try {
+ "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM if (deleteStmt == null) {
+ "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM deleteStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_CF +
+ "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.TS_COLUMN + " >= ? " + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.TS_COLUMN + " < ?"); + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.TS_COLUMN + " >= ? "
+ "AND " + ModelConstants.TS_COLUMN + " < ?");
}
} finally {
stmtCreationLock.unlock();
}
} }
return deleteStmt; return deleteStmt;
} }
@ -585,27 +595,41 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
private PreparedStatement getDeletePartitionStmt() { private PreparedStatement getDeletePartitionStmt() {
if (deletePartitionStmt == null) { if (deletePartitionStmt == null) {
deletePartitionStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_PARTITIONS_CF + stmtCreationLock.lock();
" WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM try {
+ "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM if (deletePartitionStmt == null) {
+ "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM deletePartitionStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_PARTITIONS_CF +
+ "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM); " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM);
}
} finally {
stmtCreationLock.unlock();
}
} }
return deletePartitionStmt; return deletePartitionStmt;
} }
private PreparedStatement getSaveStmt(DataType dataType) { private PreparedStatement getSaveStmt(DataType dataType) {
if (saveStmts == null) { if (saveStmts == null) {
saveStmts = new PreparedStatement[DataType.values().length]; stmtCreationLock.lock();
for (DataType type : DataType.values()) { try {
saveStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF + if (saveStmts == null) {
"(" + ModelConstants.ENTITY_TYPE_COLUMN + saveStmts = new PreparedStatement[DataType.values().length];
"," + ModelConstants.ENTITY_ID_COLUMN + for (DataType type : DataType.values()) {
"," + ModelConstants.KEY_COLUMN + saveStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
"," + ModelConstants.PARTITION_COLUMN + "(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.TS_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN +
"," + getColumnName(type) + ")" + "," + ModelConstants.KEY_COLUMN +
" VALUES(?, ?, ?, ?, ?, ?)"); "," + ModelConstants.PARTITION_COLUMN +
"," + ModelConstants.TS_COLUMN +
"," + getColumnName(type) + ")" +
" VALUES(?, ?, ?, ?, ?, ?)");
}
}
} finally {
stmtCreationLock.unlock();
} }
} }
return saveStmts[dataType.ordinal()]; return saveStmts[dataType.ordinal()];
@ -613,16 +637,23 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
private PreparedStatement getSaveTtlStmt(DataType dataType) { private PreparedStatement getSaveTtlStmt(DataType dataType) {
if (saveTtlStmts == null) { if (saveTtlStmts == null) {
saveTtlStmts = new PreparedStatement[DataType.values().length]; stmtCreationLock.lock();
for (DataType type : DataType.values()) { try {
saveTtlStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF + if (saveTtlStmts == null) {
"(" + ModelConstants.ENTITY_TYPE_COLUMN + saveTtlStmts = new PreparedStatement[DataType.values().length];
"," + ModelConstants.ENTITY_ID_COLUMN + for (DataType type : DataType.values()) {
"," + ModelConstants.KEY_COLUMN + saveTtlStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
"," + ModelConstants.PARTITION_COLUMN + "(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.TS_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN +
"," + getColumnName(type) + ")" + "," + ModelConstants.KEY_COLUMN +
" VALUES(?, ?, ?, ?, ?, ?) USING TTL ?"); "," + ModelConstants.PARTITION_COLUMN +
"," + ModelConstants.TS_COLUMN +
"," + getColumnName(type) + ")" +
" VALUES(?, ?, ?, ?, ?, ?) USING TTL ?");
}
}
} finally {
stmtCreationLock.unlock();
} }
} }
return saveTtlStmts[dataType.ordinal()]; return saveTtlStmts[dataType.ordinal()];
@ -630,24 +661,38 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
private PreparedStatement getPartitionInsertStmt() { private PreparedStatement getPartitionInsertStmt() {
if (partitionInsertStmt == null) { if (partitionInsertStmt == null) {
partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + stmtCreationLock.lock();
"(" + ModelConstants.ENTITY_TYPE_COLUMN + try {
"," + ModelConstants.ENTITY_ID_COLUMN + if (partitionInsertStmt == null) {
"," + ModelConstants.PARTITION_COLUMN + partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
"," + ModelConstants.KEY_COLUMN + ")" + "(" + ModelConstants.ENTITY_TYPE_COLUMN +
" VALUES(?, ?, ?, ?)"); "," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.PARTITION_COLUMN +
"," + ModelConstants.KEY_COLUMN + ")" +
" VALUES(?, ?, ?, ?)");
}
} finally {
stmtCreationLock.unlock();
}
} }
return partitionInsertStmt; return partitionInsertStmt;
} }
private PreparedStatement getPartitionInsertTtlStmt() { private PreparedStatement getPartitionInsertTtlStmt() {
if (partitionInsertTtlStmt == null) { if (partitionInsertTtlStmt == null) {
partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + stmtCreationLock.lock();
"(" + ModelConstants.ENTITY_TYPE_COLUMN + try {
"," + ModelConstants.ENTITY_ID_COLUMN + if (partitionInsertTtlStmt == null) {
"," + ModelConstants.PARTITION_COLUMN + partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
"," + ModelConstants.KEY_COLUMN + ")" + "(" + ModelConstants.ENTITY_TYPE_COLUMN +
" VALUES(?, ?, ?, ?) USING TTL ?"); "," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.PARTITION_COLUMN +
"," + ModelConstants.KEY_COLUMN + ")" +
" VALUES(?, ?, ?, ?) USING TTL ?");
}
} finally {
stmtCreationLock.unlock();
}
} }
return partitionInsertTtlStmt; return partitionInsertTtlStmt;
} }
@ -713,12 +758,26 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
switch (orderBy) { switch (orderBy) {
case ASC_ORDER: case ASC_ORDER:
if (fetchStmtsAsc == null) { if (fetchStmtsAsc == null) {
fetchStmtsAsc = initFetchStmt(orderBy); stmtCreationLock.lock();
try {
if (fetchStmtsAsc == null) {
fetchStmtsAsc = initFetchStmt(orderBy);
}
} finally {
stmtCreationLock.unlock();
}
} }
return fetchStmtsAsc[aggType.ordinal()]; return fetchStmtsAsc[aggType.ordinal()];
case DESC_ORDER: case DESC_ORDER:
if (fetchStmtsDesc == null) { if (fetchStmtsDesc == null) {
fetchStmtsDesc = initFetchStmt(orderBy); stmtCreationLock.lock();
try {
if (fetchStmtsDesc == null) {
fetchStmtsDesc = initFetchStmt(orderBy);
}
} finally {
stmtCreationLock.unlock();
}
} }
return fetchStmtsDesc[aggType.ordinal()]; return fetchStmtsDesc[aggType.ordinal()];
default: default: