Merge pull request #14087 from smatvienko-tb/fix/cassandra_partition_estimation

Cassandra DAO: Safety trigger to fall back to use_ts_key_value_partitioning_on_read as true if estimated partitions count is greater than safety trigger value
This commit is contained in:
Viacheslav Klimov 2025-10-03 16:58:31 +03:00 committed by GitHub
commit e2658bc61a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 162 additions and 15 deletions

View File

@ -303,8 +303,11 @@ cassandra:
default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
# Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE
ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
# Enable/Disable timestamp key-value partioning on read queries
# Enable/Disable timestamp key-value partitioning on read queries
use_ts_key_value_partitioning_on_read: "${USE_TS_KV_PARTITIONING_ON_READ:true}"
# Safety trigger to fall back to use_ts_key_value_partitioning_on_read as true if estimated partitions count is greater than safety trigger value.
# It helps to prevent building huge partition list (OOM) for corner cases (like from 0 to infinity) and prefer fewer reads strategy from NoSQL database
use_ts_key_value_partitioning_on_read_max_estimated_partition_count: "${USE_TS_KV_PARTITIONING_ON_READ_MAX_ESTIMATED_PARTITION_COUNT:40}"
# The number of partitions that are cached in memory of each service. It is useful to decrease the load of re-inserting the same partitions again
ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}"
# Timeseries Time To Live (in seconds) for Cassandra Record. 0 - record has never expired

View File

@ -113,6 +113,10 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
@Value("${cassandra.query.use_ts_key_value_partitioning_on_read:true}")
private boolean useTsKeyValuePartitioningOnRead;
@Getter
@Value("${cassandra.query.use_ts_key_value_partitioning_on_read_max_estimated_partition_count:40}") // 3+ years for MONTHS
private int useTsKeyValuePartitioningOnReadMaxEstimatedPartitionCount;
@Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}")
private long partitionsCacheSize;
@ -415,22 +419,41 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
readResultsProcessingExecutor);
}
private ListenableFuture<List<Long>> getPartitionsFuture(TenantId tenantId, TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
ListenableFuture<List<Long>> getPartitionsFuture(TenantId tenantId, TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
if (isFixedPartitioning()) { //no need to fetch partitions from DB
return Futures.immediateFuture(FIXED_PARTITION);
}
if (!isUseTsKeyValuePartitioningOnRead()) {
return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition));
final long estimatedPartitionCount = estimatePartitionCount(minPartition, maxPartition);
if (estimatedPartitionCount <= useTsKeyValuePartitioningOnReadMaxEstimatedPartitionCount) {
return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition, (int) estimatedPartitionCount));
}
}
return getPartitionsFromDB(tenantId, query, entityId, minPartition, maxPartition);
}
ListenableFuture<List<Long>> getPartitionsFromDB(TenantId tenantId, TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition);
return Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
}
// Optimistic estimation of partition count, expected to be never called for infinite partitioning
long estimatePartitionCount(long minPartition, long maxPartition) {
if (maxPartition > minPartition) {
return (maxPartition - minPartition) / tsFormat.getDurationMs() + 2; //at least 2 partitions, at max 2 partitions overestimated
}
return 1; // 1 or 0, but 1 is more optimistic
}
List<Long> calculatePartitions(long minPartition, long maxPartition) {
return calculatePartitions(minPartition, maxPartition, 0);
}
List<Long> calculatePartitions(long minPartition, long maxPartition, int estimatedPartitionCount) {
if (minPartition == maxPartition) {
return Collections.singletonList(minPartition);
}
List<Long> partitions = new ArrayList<>();
List<Long> partitions = estimatedPartitionCount > 0 ? new ArrayList<>(estimatedPartitionCount) : new ArrayList<>();
long currentPartition = minPartition;
LocalDateTime currentPartitionTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentPartition), ZoneOffset.UTC);

View File

@ -15,32 +15,29 @@
*/
package org.thingsboard.server.dao.timeseries;
import lombok.Getter;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Getter
public enum NoSqlTsPartitionDate {
MINUTES("yyyy-MM-dd-HH-mm", ChronoUnit.MINUTES), HOURS("yyyy-MM-dd-HH", ChronoUnit.HOURS), DAYS("yyyy-MM-dd", ChronoUnit.DAYS), MONTHS("yyyy-MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS),INDEFINITE("",ChronoUnit.FOREVER);
private final String pattern;
private final transient TemporalUnit truncateUnit;
private final transient long durationMs;
public final static LocalDateTime EPOCH_START = LocalDateTime.ofEpochSecond(0,0, ZoneOffset.UTC);
NoSqlTsPartitionDate(String pattern, TemporalUnit truncateUnit) {
this.pattern = pattern;
this.truncateUnit = truncateUnit;
}
public String getPattern() {
return pattern;
}
public TemporalUnit getTruncateUnit() {
return truncateUnit;
this.durationMs = TimeUnit.SECONDS.toMillis(this.truncateUnit.getDuration().getSeconds());
}
public LocalDateTime truncatedTo(LocalDateTime time) {

View File

@ -117,4 +117,16 @@ public class CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest {
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:10:00Z").getTime()));
}
@Test
public void testEstimatePartitionCount() throws ParseException {
assertThat(tsDao.estimatePartitionCount(0, Long.MAX_VALUE)).as("centuries").isEqualTo(153_722_867_280_914L);
assertThat(tsDao.estimatePartitionCount(0, 0)).as("single").isEqualTo(1L);
long startTs = tsDao.toPartitionTs(
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime());
assertThat(tsDao.estimatePartitionCount(startTs, endTs)).as("600,479 minutes + 2 spare periods").isEqualTo(600479 + 2);
assertThat(tsDao.estimatePartitionCount(endTs, startTs)).as("wrong period estimated as 1").isEqualTo(1L);
}
}

View File

@ -15,25 +15,35 @@
*/
package org.thingsboard.server.dao.timeseries;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.bean.override.mockito.MockitoSpyBean;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor;
import java.text.ParseException;
import java.util.List;
import java.util.UUID;
import static org.apache.commons.lang3.time.DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CassandraBaseTimeseriesDao.class)
@ -50,7 +60,7 @@ import static org.assertj.core.api.Assertions.assertThat;
@Slf4j
public class CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest {
@Autowired
@MockitoSpyBean
CassandraBaseTimeseriesDao tsDao;
@MockBean(answer = Answers.RETURNS_MOCKS)
@ -131,4 +141,53 @@ public class CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest {
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime()));
}
@Test
public void testEstimatePartitionCount() throws ParseException {
assertThat(tsDao.estimatePartitionCount(0, Long.MAX_VALUE)).as("centuries").isEqualTo(3_507_324_297L);
assertThat(tsDao.estimatePartitionCount(0, 0)).as("single").isEqualTo(1L);
long startTs = tsDao.toPartitionTs(
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime());
assertThat(tsDao.estimatePartitionCount(startTs, endTs)).as("13 month + 2 spare periods").isEqualTo(13 + 2);
assertThat(tsDao.estimatePartitionCount(endTs, startTs)).as("wrong period estimated as 1").isEqualTo(1L);
}
@Test
public void testGetPartitionsFutureModeratePartitionsCount() throws ParseException {
TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
TsKvQuery query = mock(TsKvQuery.class);
long startTs = tsDao.toPartitionTs(
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime());
willReturn(mock(ListenableFuture.class)).given(tsDao).getPartitionsFromDB(tenantId, query, tenantId, startTs, endTs);
tsDao.getPartitionsFuture(tenantId, query, tenantId, startTs, endTs);
verify(tsDao).estimatePartitionCount(startTs, endTs);
verify(tsDao).calculatePartitions(eq(startTs), eq(endTs), anyInt());
verify(tsDao, never()).getPartitionsFromDB(tenantId, query, tenantId, startTs, endTs);
}
@Test
public void testGetPartitionsFutureHugePartitionsCountPreventOOMFallbackToDB() throws ParseException {
TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
TsKvQuery query = mock(TsKvQuery.class);
long startTs = tsDao.toPartitionTs(
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2000-12-12T00:00:00Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("3000-01-31T23:59:59Z").getTime());
willReturn(mock(ListenableFuture.class)).given(tsDao).getPartitionsFromDB(tenantId, query, tenantId, startTs, endTs);
tsDao.getPartitionsFuture(tenantId, query, tenantId, startTs, endTs);
verify(tsDao).estimatePartitionCount(startTs, endTs);
verify(tsDao, never()).calculatePartitions(eq(startTs), eq(endTs), anyInt());
verify(tsDao).getPartitionsFromDB(tenantId, query, tenantId, startTs, endTs);
}
}

View File

@ -112,4 +112,16 @@ public class CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest {
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2025-01-01T00:00:00Z").getTime()));
}
@Test
public void testEstimatePartitionCount() throws ParseException {
assertThat(tsDao.estimatePartitionCount(0, Long.MAX_VALUE)).as("centuries").isEqualTo(292_277_026L);
assertThat(tsDao.estimatePartitionCount(0, 0)).as("single").isEqualTo(1L);
long startTs = tsDao.toPartitionTs(
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime());
assertThat(tsDao.estimatePartitionCount(startTs, endTs)).as("2 years + 2 spare periods").isEqualTo(2 + 2);
assertThat(tsDao.estimatePartitionCount(endTs, startTs)).as("wrong period estimated as 1").isEqualTo(1L);
}
}

View File

@ -0,0 +1,41 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.timeseries;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import static org.assertj.core.api.Assertions.assertThat;
class NoSqlTsPartitionDateTest {
@ParameterizedTest
@EnumSource(NoSqlTsPartitionDate.class)
void getDurationMsTest(NoSqlTsPartitionDate tsPartitionDate) throws Exception {
final Long durationMs = switch (tsPartitionDate) {
case MINUTES -> 60000L;
case HOURS -> 3600000L;
case DAYS -> 86400000L;
case MONTHS -> 2629746000L;
case YEARS -> 31556952000L;
case INDEFINITE -> Long.MAX_VALUE;
default -> null; //should be here in case a new enum value will be added in future
};
assertThat(durationMs).isNotNull();
assertThat(tsPartitionDate.getDurationMs()).isEqualTo(durationMs);
}
}