From f672dbede7741cf694ccdaf0bf0c12dbfdad3ae3 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 30 Sep 2025 17:02:23 +0200 Subject: [PATCH] Cassandra DAO: Safety trigger to fall back to use_ts_key_value_partitioning_on_read as true if estimated partitions count is greater than safety trigger value. --- .../src/main/resources/thingsboard.yml | 5 +- .../CassandraBaseTimeseriesDao.java | 29 ++++++++- .../dao/timeseries/NoSqlTsPartitionDate.java | 15 ++--- ...aoPartitioningMinutesAlwaysExistsTest.java | 12 ++++ ...DaoPartitioningMonthsAlwaysExistsTest.java | 63 ++++++++++++++++++- ...sDaoPartitioningYearsAlwaysExistsTest.java | 12 ++++ .../timeseries/NoSqlTsPartitionDateTest.java | 41 ++++++++++++ 7 files changed, 162 insertions(+), 15 deletions(-) create mode 100644 dao/src/test/java/org/thingsboard/server/dao/timeseries/NoSqlTsPartitionDateTest.java diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 9fcdd0c166..660e9e3a59 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -303,8 +303,11 @@ cassandra: default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" - # Enable/Disable timestamp key-value partioning on read queries + # Enable/Disable timestamp key-value partitioning on read queries use_ts_key_value_partitioning_on_read: "${USE_TS_KV_PARTITIONING_ON_READ:true}" + # Safety trigger to fall back to use_ts_key_value_partitioning_on_read as true if estimated partitions count is greater than safety trigger value. + # It helps to prevent building huge partition list (OOM) for corner cases (like from 0 to infinity) and prefer fewer reads strategy from NoSQL database + use_ts_key_value_partitioning_on_read_max_estimated_partition_count: "${USE_TS_KV_PARTITIONING_ON_READ_MAX_ESTIMATED_PARTITION_COUNT:40}" # The number of partitions that are cached in memory of each service. It is useful to decrease the load of re-inserting the same partitions again ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}" # Timeseries Time To Live (in seconds) for Cassandra Record. 0 - record has never expired diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index f7bd64bef8..6818994629 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -113,6 +113,10 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @Value("${cassandra.query.use_ts_key_value_partitioning_on_read:true}") private boolean useTsKeyValuePartitioningOnRead; + @Getter + @Value("${cassandra.query.use_ts_key_value_partitioning_on_read_max_estimated_partition_count:40}") // 3+ years for MONTHS + private int useTsKeyValuePartitioningOnReadMaxEstimatedPartitionCount; + @Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}") private long partitionsCacheSize; @@ -415,22 +419,41 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD readResultsProcessingExecutor); } - private ListenableFuture> getPartitionsFuture(TenantId tenantId, TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) { + ListenableFuture> getPartitionsFuture(TenantId tenantId, TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) { if (isFixedPartitioning()) { //no need to fetch partitions from DB return Futures.immediateFuture(FIXED_PARTITION); } if (!isUseTsKeyValuePartitioningOnRead()) { - return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition)); + final long estimatedPartitionCount = estimatePartitionCount(minPartition, maxPartition); + if (estimatedPartitionCount <= useTsKeyValuePartitioningOnReadMaxEstimatedPartitionCount) { + return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition, (int) estimatedPartitionCount)); + } } + return getPartitionsFromDB(tenantId, query, entityId, minPartition, maxPartition); + } + + ListenableFuture> getPartitionsFromDB(TenantId tenantId, TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) { TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition); return Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); } + // Optimistic estimation of partition count, expected to be never called for infinite partitioning + long estimatePartitionCount(long minPartition, long maxPartition) { + if (maxPartition > minPartition) { + return (maxPartition - minPartition) / tsFormat.getDurationMs() + 2; //at least 2 partitions, at max 2 partitions overestimated + } + return 1; // 1 or 0, but 1 is more optimistic + } + List calculatePartitions(long minPartition, long maxPartition) { + return calculatePartitions(minPartition, maxPartition, 0); + } + + List calculatePartitions(long minPartition, long maxPartition, int estimatedPartitionCount) { if (minPartition == maxPartition) { return Collections.singletonList(minPartition); } - List partitions = new ArrayList<>(); + List partitions = estimatedPartitionCount > 0 ? new ArrayList<>(estimatedPartitionCount) : new ArrayList<>(); long currentPartition = minPartition; LocalDateTime currentPartitionTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentPartition), ZoneOffset.UTC); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/NoSqlTsPartitionDate.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/NoSqlTsPartitionDate.java index 873c33d4e3..c403a46df1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/NoSqlTsPartitionDate.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/NoSqlTsPartitionDate.java @@ -15,32 +15,29 @@ */ package org.thingsboard.server.dao.timeseries; +import lombok.Getter; + import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalUnit; import java.util.Optional; +import java.util.concurrent.TimeUnit; +@Getter public enum NoSqlTsPartitionDate { MINUTES("yyyy-MM-dd-HH-mm", ChronoUnit.MINUTES), HOURS("yyyy-MM-dd-HH", ChronoUnit.HOURS), DAYS("yyyy-MM-dd", ChronoUnit.DAYS), MONTHS("yyyy-MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS),INDEFINITE("",ChronoUnit.FOREVER); private final String pattern; private final transient TemporalUnit truncateUnit; + private final transient long durationMs; public final static LocalDateTime EPOCH_START = LocalDateTime.ofEpochSecond(0,0, ZoneOffset.UTC); NoSqlTsPartitionDate(String pattern, TemporalUnit truncateUnit) { this.pattern = pattern; this.truncateUnit = truncateUnit; - } - - - public String getPattern() { - return pattern; - } - - public TemporalUnit getTruncateUnit() { - return truncateUnit; + this.durationMs = TimeUnit.SECONDS.toMillis(this.truncateUnit.getDuration().getSeconds()); } public LocalDateTime truncatedTo(LocalDateTime time) { diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java index cc76484d0b..8ec8c493af 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java @@ -117,4 +117,16 @@ public class CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest { ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:10:00Z").getTime())); } + @Test + public void testEstimatePartitionCount() throws ParseException { + assertThat(tsDao.estimatePartitionCount(0, Long.MAX_VALUE)).as("centuries").isEqualTo(153_722_867_280_914L); + assertThat(tsDao.estimatePartitionCount(0, 0)).as("single").isEqualTo(1L); + long startTs = tsDao.toPartitionTs( + ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime()); + assertThat(tsDao.estimatePartitionCount(startTs, endTs)).as("600,479 minutes + 2 spare periods").isEqualTo(600479 + 2); + assertThat(tsDao.estimatePartitionCount(endTs, startTs)).as("wrong period estimated as 1").isEqualTo(1L); + } + } diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java index eb60000404..65ed8019c0 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java @@ -15,25 +15,35 @@ */ package org.thingsboard.server.dao.timeseries; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Answers; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoSpyBean; import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.TsKvQuery; import org.thingsboard.server.dao.cassandra.CassandraCluster; import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; import java.text.ParseException; import java.util.List; +import java.util.UUID; import static org.apache.commons.lang3.time.DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; @RunWith(SpringRunner.class) @SpringBootTest(classes = CassandraBaseTimeseriesDao.class) @@ -50,7 +60,7 @@ import static org.assertj.core.api.Assertions.assertThat; @Slf4j public class CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest { - @Autowired + @MockitoSpyBean CassandraBaseTimeseriesDao tsDao; @MockBean(answer = Answers.RETURNS_MOCKS) @@ -131,4 +141,53 @@ public class CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest { ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime())); } + @Test + public void testEstimatePartitionCount() throws ParseException { + assertThat(tsDao.estimatePartitionCount(0, Long.MAX_VALUE)).as("centuries").isEqualTo(3_507_324_297L); + assertThat(tsDao.estimatePartitionCount(0, 0)).as("single").isEqualTo(1L); + long startTs = tsDao.toPartitionTs( + ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime()); + assertThat(tsDao.estimatePartitionCount(startTs, endTs)).as("13 month + 2 spare periods").isEqualTo(13 + 2); + assertThat(tsDao.estimatePartitionCount(endTs, startTs)).as("wrong period estimated as 1").isEqualTo(1L); + } + + @Test + public void testGetPartitionsFutureModeratePartitionsCount() throws ParseException { + TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + TsKvQuery query = mock(TsKvQuery.class); + long startTs = tsDao.toPartitionTs( + ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime()); + + willReturn(mock(ListenableFuture.class)).given(tsDao).getPartitionsFromDB(tenantId, query, tenantId, startTs, endTs); + + tsDao.getPartitionsFuture(tenantId, query, tenantId, startTs, endTs); + + verify(tsDao).estimatePartitionCount(startTs, endTs); + verify(tsDao).calculatePartitions(eq(startTs), eq(endTs), anyInt()); + verify(tsDao, never()).getPartitionsFromDB(tenantId, query, tenantId, startTs, endTs); + } + + @Test + public void testGetPartitionsFutureHugePartitionsCountPreventOOMFallbackToDB() throws ParseException { + + TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + TsKvQuery query = mock(TsKvQuery.class); + long startTs = tsDao.toPartitionTs( + ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2000-12-12T00:00:00Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("3000-01-31T23:59:59Z").getTime()); + + willReturn(mock(ListenableFuture.class)).given(tsDao).getPartitionsFromDB(tenantId, query, tenantId, startTs, endTs); + + tsDao.getPartitionsFuture(tenantId, query, tenantId, startTs, endTs); + + verify(tsDao).estimatePartitionCount(startTs, endTs); + verify(tsDao, never()).calculatePartitions(eq(startTs), eq(endTs), anyInt()); + verify(tsDao).getPartitionsFromDB(tenantId, query, tenantId, startTs, endTs); + } + } diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java index 0425b23a1e..aa3e48c73f 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java @@ -112,4 +112,16 @@ public class CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest { ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2025-01-01T00:00:00Z").getTime())); } + @Test + public void testEstimatePartitionCount() throws ParseException { + assertThat(tsDao.estimatePartitionCount(0, Long.MAX_VALUE)).as("centuries").isEqualTo(292_277_026L); + assertThat(tsDao.estimatePartitionCount(0, 0)).as("single").isEqualTo(1L); + long startTs = tsDao.toPartitionTs( + ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime()); + assertThat(tsDao.estimatePartitionCount(startTs, endTs)).as("2 years + 2 spare periods").isEqualTo(2 + 2); + assertThat(tsDao.estimatePartitionCount(endTs, startTs)).as("wrong period estimated as 1").isEqualTo(1L); + } + } diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/NoSqlTsPartitionDateTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/NoSqlTsPartitionDateTest.java new file mode 100644 index 0000000000..f860d6ad51 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/NoSqlTsPartitionDateTest.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import static org.assertj.core.api.Assertions.assertThat; + +class NoSqlTsPartitionDateTest { + + @ParameterizedTest + @EnumSource(NoSqlTsPartitionDate.class) + void getDurationMsTest(NoSqlTsPartitionDate tsPartitionDate) throws Exception { + final Long durationMs = switch (tsPartitionDate) { + case MINUTES -> 60000L; + case HOURS -> 3600000L; + case DAYS -> 86400000L; + case MONTHS -> 2629746000L; + case YEARS -> 31556952000L; + case INDEFINITE -> Long.MAX_VALUE; + default -> null; //should be here in case a new enum value will be added in future + }; + assertThat(durationMs).isNotNull(); + assertThat(tsPartitionDate.getDurationMs()).isEqualTo(durationMs); + } + +}