refactoring
This commit is contained in:
		
							parent
							
								
									ec2305df64
								
							
						
					
					
						commit
						7d79fa0a62
					
				@ -227,7 +227,7 @@ cassandra:
 | 
			
		||||
    default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
 | 
			
		||||
    # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE
 | 
			
		||||
    ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
 | 
			
		||||
    ts_key_value_partitioning_always_exist_in_reading: "${TS_KV_PARTITIONING_ALWAYS_EXIST_IN_READING:false}"
 | 
			
		||||
    use_ts_key_value_partitioning_on_read: "${USE_TS_KV_PARTITIONING_ON_READ:true}"
 | 
			
		||||
    ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}"
 | 
			
		||||
    ts_key_value_ttl: "${TS_KV_TTL:0}"
 | 
			
		||||
    buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
 | 
			
		||||
 | 
			
		||||
@ -94,8 +94,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
 | 
			
		||||
    private String partitioning;
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    @Value("${cassandra.query.ts_key_value_partitioning_always_exist_in_reading:false}")
 | 
			
		||||
    private boolean partitioningAlwaysExistInReading;
 | 
			
		||||
    @Value("${cassandra.query.use_ts_key_value_partitioning_on_read:true}")
 | 
			
		||||
    private boolean useTsKeyValuePartitioningOnRead;
 | 
			
		||||
 | 
			
		||||
    @Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}")
 | 
			
		||||
    private long partitionsCacheSize;
 | 
			
		||||
@ -381,7 +381,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
 | 
			
		||||
        if (isFixedPartitioning()) { //no need to fetch partitions from DB
 | 
			
		||||
            return Futures.immediateFuture(FIXED_PARTITION);
 | 
			
		||||
        }
 | 
			
		||||
        if (isPartitioningAlwaysExistInReading()) {
 | 
			
		||||
        if (!isUseTsKeyValuePartitioningOnRead()) {
 | 
			
		||||
            return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition));
 | 
			
		||||
        }
 | 
			
		||||
        TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition);
 | 
			
		||||
@ -393,20 +393,23 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
 | 
			
		||||
            return Collections.singletonList(minPartition);
 | 
			
		||||
        }
 | 
			
		||||
        List<Long> partitions = new ArrayList<>();
 | 
			
		||||
        partitions.add(minPartition);
 | 
			
		||||
 | 
			
		||||
        long currentPartition = minPartition;
 | 
			
		||||
        while (maxPartition > (currentPartition = calculateNextPartition(currentPartition))) {
 | 
			
		||||
        LocalDateTime currentPartitionTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentPartition), ZoneOffset.UTC);
 | 
			
		||||
 | 
			
		||||
        while (maxPartition > currentPartition) {
 | 
			
		||||
            partitions.add(currentPartition);
 | 
			
		||||
            currentPartitionTime = calculateNextPartition(currentPartitionTime);
 | 
			
		||||
            currentPartition = currentPartitionTime.toInstant(ZoneOffset.UTC).toEpochMilli();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        partitions.add(maxPartition);
 | 
			
		||||
 | 
			
		||||
        return partitions;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private long calculateNextPartition(long ts) {
 | 
			
		||||
        LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
 | 
			
		||||
        return time.plus(1, tsFormat.getTruncateUnit()).toInstant(ZoneOffset.UTC).toEpochMilli();
 | 
			
		||||
    private LocalDateTime calculateNextPartition(LocalDateTime time) {
 | 
			
		||||
        return time.plus(1, tsFormat.getTruncateUnit());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private AsyncFunction<List<Long>, List<TbResultSet>> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
 | 
			
		||||
 | 
			
		||||
@ -40,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
@TestPropertySource(properties = {
 | 
			
		||||
        "database.ts.type=cassandra",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitioning=DAYS",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true",
 | 
			
		||||
        "cassandra.query.use_ts_key_value_partitioning_on_read=false",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_max_cache_size=100000",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
 | 
			
		||||
 | 
			
		||||
@ -40,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
@TestPropertySource(properties = {
 | 
			
		||||
        "database.ts.type=cassandra",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitioning=HOURS",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true",
 | 
			
		||||
        "cassandra.query.use_ts_key_value_partitioning_on_read=false",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_max_cache_size=100000",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
 | 
			
		||||
 | 
			
		||||
@ -41,7 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
@TestPropertySource(properties = {
 | 
			
		||||
        "database.ts.type=cassandra",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitioning=INDEFINITE",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true",
 | 
			
		||||
        "cassandra.query.use_ts_key_value_partitioning_on_read=false",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_max_cache_size=100000",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
 | 
			
		||||
 | 
			
		||||
@ -41,7 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
@TestPropertySource(properties = {
 | 
			
		||||
        "database.ts.type=cassandra",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitioning=MINUTES",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true",
 | 
			
		||||
        "cassandra.query.use_ts_key_value_partitioning_on_read=false",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_max_cache_size=100000",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
 | 
			
		||||
 | 
			
		||||
@ -40,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
@TestPropertySource(properties = {
 | 
			
		||||
        "database.ts.type=cassandra",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitioning=MONTHS",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true",
 | 
			
		||||
        "cassandra.query.use_ts_key_value_partitioning_on_read=false",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_max_cache_size=100000",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
 | 
			
		||||
 | 
			
		||||
@ -41,7 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
@TestPropertySource(properties = {
 | 
			
		||||
        "database.ts.type=cassandra",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitioning=YEARS",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true",
 | 
			
		||||
        "cassandra.query.use_ts_key_value_partitioning_on_read=false",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_max_cache_size=100000",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
 | 
			
		||||
        "cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user