calculatePartitions improvements and added tests for cassandra ts dao

This commit is contained in:
YevhenBondarenko 2022-11-16 15:44:16 +01:00
parent 293a7ab078
commit 8a27614936
5 changed files with 128 additions and 24 deletions

View File

@ -107,7 +107,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
private boolean setNullValuesEnabled;
private NoSqlTsPartitionDate tsFormat;
private long partitionDurationPlusOneMs;
private PreparedStatement partitionInsertStmt;
private PreparedStatement partitionInsertTtlStmt;
@ -132,9 +131,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
Optional<NoSqlTsPartitionDate> partition = NoSqlTsPartitionDate.parse(partitioning);
if (partition.isPresent()) {
tsFormat = partition.get();
if (!isFixedPartitioning()) {
partitionDurationPlusOneMs = partitionToMs(tsFormat) + 1;
}
if (!isFixedPartitioning() && partitionsCacheSize > 0) {
cassandraTsPartitionsCache = new CassandraTsPartitionsCache(partitionsCacheSize);
}
@ -144,18 +140,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
}
}
long partitionToMs(final NoSqlTsPartitionDate partition) {
switch (partition) {
case MINUTES: return TimeUnit.MINUTES.toMillis(1);
case HOURS: return TimeUnit.HOURS.toMillis(1);
case DAYS: return TimeUnit.DAYS.toMillis(1);
case MONTHS: return TimeUnit.DAYS.toMillis(31); //the longest month
case YEARS: return TimeUnit.DAYS.toMillis(366); // leap year
}
log.warn("Can not convert partition to milliseconds. There are no mapping [{}] for partitioning [{}]", partition, partitioning);
throw new RuntimeException("Failed to convert partition to ms: " + partitioning + "!");
}
@PreDestroy
public void stop() {
super.stopExecutor();
@ -452,7 +436,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
partitions.add(minPartition);
long currentPartition = minPartition;
while (maxPartition > (currentPartition = toPartitionTs(currentPartition + partitionDurationPlusOneMs))){
while (maxPartition > (currentPartition = calculateNextPartition(currentPartition))) {
partitions.add(currentPartition);
}
@ -460,6 +444,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
return partitions;
}
private long calculateNextPartition(long ts) {
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
return time.plus(1, tsFormat.getTruncateUnit()).toInstant(ZoneOffset.UTC).toEpochMilli();
}
private AsyncFunction<List<Long>, List<TbResultSet>> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
return partitions -> {
try {

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.dao.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
@ -84,10 +83,33 @@ public class CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest {
}
@Ignore //TODO make test for Days
@Test
public void testCalculatePartitionsDays() throws ParseException {
long startTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime());
long nextTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-12T23:59:59Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-15T00:00:00Z").getTime());
log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs);
assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L));
assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L));
assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-12T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(6).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-12T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-13T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-14T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-15T00:00:00Z").getTime()));
}
}

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.dao.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
@ -83,11 +82,53 @@ public class CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest {
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:00:00Z").getTime());
}
@Ignore //TODO make test for Hours
@Test
public void testCalculatePartitionsHours() throws ParseException {
long startTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime());
long nextTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T03:59:59Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:59:00Z").getTime());
log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs);
assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L));
assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L));
assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T01:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T02:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T03:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(25).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T01:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T02:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T03:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T04:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T05:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T06:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T07:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T08:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T09:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T10:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T11:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T12:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T13:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T14:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T15:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T16:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T17:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T18:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T19:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T20:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T21:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T22:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T23:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:00:00Z").getTime()));
}
}

View File

@ -84,10 +84,38 @@ public class CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest {
}
@Ignore //TODO make test for Minutes
@Test
public void testCalculatePartitionsMinutes() throws ParseException {
long startTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime());
long nextTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:02:59Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:10:00Z").getTime());
log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs);
assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L));
assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L));
assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:01:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:02:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(11).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:01:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:02:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:03:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:04:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:05:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:06:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:07:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:08:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:09:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:10:00Z").getTime()));
}
}

View File

@ -83,10 +83,34 @@ public class CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest {
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-01-01T00:00:00Z").getTime());
}
@Ignore //TODO make test for Years
@Test
public void testCalculatePartitionsYears() throws ParseException {
long startTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime());
long nextTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-10-12T23:59:59Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2025-07-15T00:00:00Z").getTime());
log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs);
assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L));
assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L));
assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(7).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2024-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2025-01-01T00:00:00Z").getTime()));
}
}