formatting ,added INDEFINITE to possible partitioning in yml ,savePartition returns immediateFuture(null) if FixedPartition mode enabled
This commit is contained in:
parent
88307d6ef1
commit
2b212d777c
@ -201,7 +201,7 @@ cassandra:
|
|||||||
read_consistency_level: "${CASSANDRA_READ_CONSISTENCY_LEVEL:ONE}"
|
read_consistency_level: "${CASSANDRA_READ_CONSISTENCY_LEVEL:ONE}"
|
||||||
write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}"
|
write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}"
|
||||||
default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
|
default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
|
||||||
# Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
|
# Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS,INDEFINITE
|
||||||
ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
|
ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
|
||||||
ts_key_value_ttl: "${TS_KV_TTL:0}"
|
ts_key_value_ttl: "${TS_KV_TTL:0}"
|
||||||
buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
|
buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
|
||||||
|
|||||||
@ -1,12 +1,12 @@
|
|||||||
/**
|
/**
|
||||||
* Copyright © 2016-2018 The Thingsboard Authors
|
* Copyright © 2016-2018 The Thingsboard Authors
|
||||||
*
|
* <p>
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
*
|
* <p>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
|||||||
@ -1,12 +1,12 @@
|
|||||||
/**
|
/**
|
||||||
* Copyright © 2016-2018 The Thingsboard Authors
|
* Copyright © 2016-2018 The Thingsboard Authors
|
||||||
*
|
* <p>
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
*
|
* <p>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
@ -163,13 +163,13 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isFixedPartitioning(){
|
public boolean isFixedPartitioning() {
|
||||||
return tsFormat.getTruncateUnit().equals(TsPartitionDate.FIXED_PARTITION);
|
return tsFormat.getTruncateUnit().equals(TsPartitionDate.FIXED_PARTITION);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<List<Long>> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition){
|
private ListenableFuture<List<Long>> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
|
||||||
|
|
||||||
if(isFixedPartitioning()){ //no need to fetch partitions from DB
|
if (isFixedPartitioning()) { //no need to fetch partitions from DB
|
||||||
return Futures.immediateFuture(FIXED_PARTITION);
|
return Futures.immediateFuture(FIXED_PARTITION);
|
||||||
}
|
}
|
||||||
ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
|
ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
|
||||||
@ -183,7 +183,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
|||||||
long minPartition = toPartitionTs(query.getStartTs());
|
long minPartition = toPartitionTs(query.getStartTs());
|
||||||
long maxPartition = toPartitionTs(query.getEndTs());
|
long maxPartition = toPartitionTs(query.getEndTs());
|
||||||
|
|
||||||
final ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query,entityId,minPartition,maxPartition);
|
final ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition);
|
||||||
final SimpleListenableFuture<List<TsKvEntry>> resultFuture = new SimpleListenableFuture<>();
|
final SimpleListenableFuture<List<TsKvEntry>> resultFuture = new SimpleListenableFuture<>();
|
||||||
|
|
||||||
Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
|
Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
|
||||||
@ -244,7 +244,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
|||||||
final long ts = startTs + (endTs - startTs) / 2;
|
final long ts = startTs + (endTs - startTs) / 2;
|
||||||
|
|
||||||
|
|
||||||
ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query,entityId, minPartition, maxPartition);
|
ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition);
|
||||||
ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transformAsync(partitionsListFuture,
|
ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transformAsync(partitionsListFuture,
|
||||||
getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
|
getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
|
||||||
|
|
||||||
@ -320,8 +320,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) {
|
public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) {
|
||||||
if(isFixedPartitioning()){
|
if (isFixedPartitioning()) {
|
||||||
return getFuture(null, rs -> null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
ttl = computeTtl(ttl);
|
ttl = computeTtl(ttl);
|
||||||
long partition = toPartitionTs(tsKvEntryTs);
|
long partition = toPartitionTs(tsKvEntryTs);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user