Cassandra DAO: Safety trigger to fall back to use_ts_key_value_partitioning_on_read as true if estimated partitions count is greater than safety trigger value.
This commit is contained in:
parent
2e7a285f7d
commit
f672dbede7
@ -303,8 +303,11 @@ cassandra:
|
||||
default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
|
||||
# Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE
|
||||
ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
|
||||
# Enable/Disable timestamp key-value partioning on read queries
|
||||
# Enable/Disable timestamp key-value partitioning on read queries
|
||||
use_ts_key_value_partitioning_on_read: "${USE_TS_KV_PARTITIONING_ON_READ:true}"
|
||||
# Safety trigger to fall back to use_ts_key_value_partitioning_on_read as true if estimated partitions count is greater than safety trigger value.
|
||||
# It helps to prevent building huge partition list (OOM) for corner cases (like from 0 to infinity) and prefer fewer reads strategy from NoSQL database
|
||||
use_ts_key_value_partitioning_on_read_max_estimated_partition_count: "${USE_TS_KV_PARTITIONING_ON_READ_MAX_ESTIMATED_PARTITION_COUNT:40}"
|
||||
# The number of partitions that are cached in memory of each service. It is useful to decrease the load of re-inserting the same partitions again
|
||||
ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}"
|
||||
# Timeseries Time To Live (in seconds) for Cassandra Record. 0 - record has never expired
|
||||
|
||||
@ -113,6 +113,10 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
|
||||
@Value("${cassandra.query.use_ts_key_value_partitioning_on_read:true}")
|
||||
private boolean useTsKeyValuePartitioningOnRead;
|
||||
|
||||
@Getter
|
||||
@Value("${cassandra.query.use_ts_key_value_partitioning_on_read_max_estimated_partition_count:40}") // 3+ years for MONTHS
|
||||
private int useTsKeyValuePartitioningOnReadMaxEstimatedPartitionCount;
|
||||
|
||||
@Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}")
|
||||
private long partitionsCacheSize;
|
||||
|
||||
@ -415,22 +419,41 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
|
||||
readResultsProcessingExecutor);
|
||||
}
|
||||
|
||||
private ListenableFuture<List<Long>> getPartitionsFuture(TenantId tenantId, TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
|
||||
ListenableFuture<List<Long>> getPartitionsFuture(TenantId tenantId, TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
|
||||
if (isFixedPartitioning()) { //no need to fetch partitions from DB
|
||||
return Futures.immediateFuture(FIXED_PARTITION);
|
||||
}
|
||||
if (!isUseTsKeyValuePartitioningOnRead()) {
|
||||
return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition));
|
||||
final long estimatedPartitionCount = estimatePartitionCount(minPartition, maxPartition);
|
||||
if (estimatedPartitionCount <= useTsKeyValuePartitioningOnReadMaxEstimatedPartitionCount) {
|
||||
return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition, (int) estimatedPartitionCount));
|
||||
}
|
||||
}
|
||||
return getPartitionsFromDB(tenantId, query, entityId, minPartition, maxPartition);
|
||||
}
|
||||
|
||||
ListenableFuture<List<Long>> getPartitionsFromDB(TenantId tenantId, TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
|
||||
TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition);
|
||||
return Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
|
||||
}
|
||||
|
||||
// Optimistic estimation of partition count, expected to be never called for infinite partitioning
|
||||
long estimatePartitionCount(long minPartition, long maxPartition) {
|
||||
if (maxPartition > minPartition) {
|
||||
return (maxPartition - minPartition) / tsFormat.getDurationMs() + 2; //at least 2 partitions, at max 2 partitions overestimated
|
||||
}
|
||||
return 1; // 1 or 0, but 1 is more optimistic
|
||||
}
|
||||
|
||||
List<Long> calculatePartitions(long minPartition, long maxPartition) {
|
||||
return calculatePartitions(minPartition, maxPartition, 0);
|
||||
}
|
||||
|
||||
List<Long> calculatePartitions(long minPartition, long maxPartition, int estimatedPartitionCount) {
|
||||
if (minPartition == maxPartition) {
|
||||
return Collections.singletonList(minPartition);
|
||||
}
|
||||
List<Long> partitions = new ArrayList<>();
|
||||
List<Long> partitions = estimatedPartitionCount > 0 ? new ArrayList<>(estimatedPartitionCount) : new ArrayList<>();
|
||||
|
||||
long currentPartition = minPartition;
|
||||
LocalDateTime currentPartitionTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentPartition), ZoneOffset.UTC);
|
||||
|
||||
@ -15,32 +15,29 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.timeseries;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.time.temporal.TemporalUnit;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Getter
|
||||
public enum NoSqlTsPartitionDate {
|
||||
|
||||
MINUTES("yyyy-MM-dd-HH-mm", ChronoUnit.MINUTES), HOURS("yyyy-MM-dd-HH", ChronoUnit.HOURS), DAYS("yyyy-MM-dd", ChronoUnit.DAYS), MONTHS("yyyy-MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS),INDEFINITE("",ChronoUnit.FOREVER);
|
||||
|
||||
private final String pattern;
|
||||
private final transient TemporalUnit truncateUnit;
|
||||
private final transient long durationMs;
|
||||
public final static LocalDateTime EPOCH_START = LocalDateTime.ofEpochSecond(0,0, ZoneOffset.UTC);
|
||||
|
||||
NoSqlTsPartitionDate(String pattern, TemporalUnit truncateUnit) {
|
||||
this.pattern = pattern;
|
||||
this.truncateUnit = truncateUnit;
|
||||
}
|
||||
|
||||
|
||||
public String getPattern() {
|
||||
return pattern;
|
||||
}
|
||||
|
||||
public TemporalUnit getTruncateUnit() {
|
||||
return truncateUnit;
|
||||
this.durationMs = TimeUnit.SECONDS.toMillis(this.truncateUnit.getDuration().getSeconds());
|
||||
}
|
||||
|
||||
public LocalDateTime truncatedTo(LocalDateTime time) {
|
||||
|
||||
@ -117,4 +117,16 @@ public class CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest {
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:10:00Z").getTime()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEstimatePartitionCount() throws ParseException {
|
||||
assertThat(tsDao.estimatePartitionCount(0, Long.MAX_VALUE)).as("centuries").isEqualTo(153_722_867_280_914L);
|
||||
assertThat(tsDao.estimatePartitionCount(0, 0)).as("single").isEqualTo(1L);
|
||||
long startTs = tsDao.toPartitionTs(
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime());
|
||||
long endTs = tsDao.toPartitionTs(
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime());
|
||||
assertThat(tsDao.estimatePartitionCount(startTs, endTs)).as("600,479 minutes + 2 spare periods").isEqualTo(600479 + 2);
|
||||
assertThat(tsDao.estimatePartitionCount(endTs, startTs)).as("wrong period estimated as 1").isEqualTo(1L);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -15,25 +15,35 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.timeseries;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Answers;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.bean.override.mockito.MockitoSpyBean;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.TsKvQuery;
|
||||
import org.thingsboard.server.dao.cassandra.CassandraCluster;
|
||||
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor;
|
||||
import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.apache.commons.lang3.time.DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.BDDMockito.willReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = CassandraBaseTimeseriesDao.class)
|
||||
@ -50,7 +60,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
@Slf4j
|
||||
public class CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest {
|
||||
|
||||
@Autowired
|
||||
@MockitoSpyBean
|
||||
CassandraBaseTimeseriesDao tsDao;
|
||||
|
||||
@MockBean(answer = Answers.RETURNS_MOCKS)
|
||||
@ -131,4 +141,53 @@ public class CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest {
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEstimatePartitionCount() throws ParseException {
|
||||
assertThat(tsDao.estimatePartitionCount(0, Long.MAX_VALUE)).as("centuries").isEqualTo(3_507_324_297L);
|
||||
assertThat(tsDao.estimatePartitionCount(0, 0)).as("single").isEqualTo(1L);
|
||||
long startTs = tsDao.toPartitionTs(
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime());
|
||||
long endTs = tsDao.toPartitionTs(
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime());
|
||||
assertThat(tsDao.estimatePartitionCount(startTs, endTs)).as("13 month + 2 spare periods").isEqualTo(13 + 2);
|
||||
assertThat(tsDao.estimatePartitionCount(endTs, startTs)).as("wrong period estimated as 1").isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetPartitionsFutureModeratePartitionsCount() throws ParseException {
|
||||
TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
|
||||
TsKvQuery query = mock(TsKvQuery.class);
|
||||
long startTs = tsDao.toPartitionTs(
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime());
|
||||
long endTs = tsDao.toPartitionTs(
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime());
|
||||
|
||||
willReturn(mock(ListenableFuture.class)).given(tsDao).getPartitionsFromDB(tenantId, query, tenantId, startTs, endTs);
|
||||
|
||||
tsDao.getPartitionsFuture(tenantId, query, tenantId, startTs, endTs);
|
||||
|
||||
verify(tsDao).estimatePartitionCount(startTs, endTs);
|
||||
verify(tsDao).calculatePartitions(eq(startTs), eq(endTs), anyInt());
|
||||
verify(tsDao, never()).getPartitionsFromDB(tenantId, query, tenantId, startTs, endTs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetPartitionsFutureHugePartitionsCountPreventOOMFallbackToDB() throws ParseException {
|
||||
|
||||
TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
|
||||
TsKvQuery query = mock(TsKvQuery.class);
|
||||
long startTs = tsDao.toPartitionTs(
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2000-12-12T00:00:00Z").getTime());
|
||||
long endTs = tsDao.toPartitionTs(
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("3000-01-31T23:59:59Z").getTime());
|
||||
|
||||
willReturn(mock(ListenableFuture.class)).given(tsDao).getPartitionsFromDB(tenantId, query, tenantId, startTs, endTs);
|
||||
|
||||
tsDao.getPartitionsFuture(tenantId, query, tenantId, startTs, endTs);
|
||||
|
||||
verify(tsDao).estimatePartitionCount(startTs, endTs);
|
||||
verify(tsDao, never()).calculatePartitions(eq(startTs), eq(endTs), anyInt());
|
||||
verify(tsDao).getPartitionsFromDB(tenantId, query, tenantId, startTs, endTs);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -112,4 +112,16 @@ public class CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest {
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2025-01-01T00:00:00Z").getTime()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEstimatePartitionCount() throws ParseException {
|
||||
assertThat(tsDao.estimatePartitionCount(0, Long.MAX_VALUE)).as("centuries").isEqualTo(292_277_026L);
|
||||
assertThat(tsDao.estimatePartitionCount(0, 0)).as("single").isEqualTo(1L);
|
||||
long startTs = tsDao.toPartitionTs(
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime());
|
||||
long endTs = tsDao.toPartitionTs(
|
||||
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime());
|
||||
assertThat(tsDao.estimatePartitionCount(startTs, endTs)).as("2 years + 2 spare periods").isEqualTo(2 + 2);
|
||||
assertThat(tsDao.estimatePartitionCount(endTs, startTs)).as("wrong period estimated as 1").isEqualTo(1L);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,41 @@
|
||||
/**
|
||||
* Copyright © 2016-2025 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.timeseries;
|
||||
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class NoSqlTsPartitionDateTest {
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(NoSqlTsPartitionDate.class)
|
||||
void getDurationMsTest(NoSqlTsPartitionDate tsPartitionDate) throws Exception {
|
||||
final Long durationMs = switch (tsPartitionDate) {
|
||||
case MINUTES -> 60000L;
|
||||
case HOURS -> 3600000L;
|
||||
case DAYS -> 86400000L;
|
||||
case MONTHS -> 2629746000L;
|
||||
case YEARS -> 31556952000L;
|
||||
case INDEFINITE -> Long.MAX_VALUE;
|
||||
default -> null; //should be here in case a new enum value will be added in future
|
||||
};
|
||||
assertThat(durationMs).isNotNull();
|
||||
assertThat(tsPartitionDate.getDurationMs()).isEqualTo(durationMs);
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user