From 7d79fa0a628d5656ee0ce3b22e9fe9d9d3425b78 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 17 Nov 2022 11:07:15 +0100 Subject: [PATCH] refactoring --- .../src/main/resources/thingsboard.yml | 2 +- .../CassandraBaseTimeseriesDao.java | 19 +++++++++++-------- ...esDaoPartitioningDaysAlwaysExistsTest.java | 2 +- ...sDaoPartitioningHoursAlwaysExistsTest.java | 2 +- ...artitioningIndefiniteAlwaysExistsTest.java | 2 +- ...aoPartitioningMinutesAlwaysExistsTest.java | 2 +- ...DaoPartitioningMonthsAlwaysExistsTest.java | 2 +- ...sDaoPartitioningYearsAlwaysExistsTest.java | 2 +- 8 files changed, 18 insertions(+), 15 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index fcf62a4262..6028a843ea 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -227,7 +227,7 @@ cassandra: default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" - ts_key_value_partitioning_always_exist_in_reading: "${TS_KV_PARTITIONING_ALWAYS_EXIST_IN_READING:false}" + use_ts_key_value_partitioning_on_read: "${USE_TS_KV_PARTITIONING_ON_READ:true}" ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}" ts_key_value_ttl: "${TS_KV_TTL:0}" buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index ffc52787f9..737ef8d8f6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -94,8 +94,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD private String partitioning; @Getter - @Value("${cassandra.query.ts_key_value_partitioning_always_exist_in_reading:false}") - private boolean partitioningAlwaysExistInReading; + @Value("${cassandra.query.use_ts_key_value_partitioning_on_read:true}") + private boolean useTsKeyValuePartitioningOnRead; @Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}") private long partitionsCacheSize; @@ -381,7 +381,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD if (isFixedPartitioning()) { //no need to fetch partitions from DB return Futures.immediateFuture(FIXED_PARTITION); } - if (isPartitioningAlwaysExistInReading()) { + if (!isUseTsKeyValuePartitioningOnRead()) { return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition)); } TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition); @@ -393,20 +393,23 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD return Collections.singletonList(minPartition); } List partitions = new ArrayList<>(); - partitions.add(minPartition); long currentPartition = minPartition; - while (maxPartition > (currentPartition = calculateNextPartition(currentPartition))) { + LocalDateTime currentPartitionTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentPartition), ZoneOffset.UTC); + + while (maxPartition > currentPartition) { partitions.add(currentPartition); + currentPartitionTime = calculateNextPartition(currentPartitionTime); + currentPartition = currentPartitionTime.toInstant(ZoneOffset.UTC).toEpochMilli(); } partitions.add(maxPartition); + return partitions; } - private long calculateNextPartition(long ts) { - LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); - return time.plus(1, tsFormat.getTruncateUnit()).toInstant(ZoneOffset.UTC).toEpochMilli(); + private LocalDateTime calculateNextPartition(LocalDateTime time) { + return time.plus(1, tsFormat.getTruncateUnit()); } private AsyncFunction, List> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) { diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java index 4b043b660f..d5e8350104 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java @@ -40,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestPropertySource(properties = { "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=DAYS", - "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java index 4ac057aae0..d5381653b3 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java @@ -40,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestPropertySource(properties = { "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=HOURS", - "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java index f83632c229..82f94cde36 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java @@ -41,7 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestPropertySource(properties = { "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=INDEFINITE", - "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java index 056ee55562..ffdf18c42b 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java @@ -41,7 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestPropertySource(properties = { "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=MINUTES", - "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java index f9a90e0d41..8c7d12da86 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java @@ -40,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestPropertySource(properties = { "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=MONTHS", - "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java index 3afca0b244..4b2bcb6455 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java @@ -41,7 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestPropertySource(properties = { "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=YEARS", - "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", "cassandra.query.ts_key_value_partitions_cache_stats_interval=60",