From 81bf9598fc0df360d17614f5ac0101b143b3e2a4 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 4 Apr 2025 13:18:41 +0300 Subject: [PATCH 1/3] fixed time series cache --- .../server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java | 4 ---- .../server/dao/sqlts/SqlTimeseriesLatestDao.java | 6 +++--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java index 6e43034a44..d45182442a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java @@ -33,18 +33,14 @@ import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.stats.DefaultCounter; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.cache.CacheExecutorService; -import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; import org.thingsboard.server.dao.timeseries.TsLatestCacheKey; import org.thingsboard.server.dao.util.SqlTsLatestAnyDaoCachedRedis; import java.util.List; -import java.util.Map; import java.util.Optional; @Slf4j diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index e8ef37b3b5..c546fc21ea 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -189,7 +189,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme } - private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query, Long version) { ListenableFuture> future = findNewLatestEntryFuture(tenantId, entityId, query); return Futures.transformAsync(future, entryList -> { if (entryList.size() == 1) { @@ -198,7 +198,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme } else { log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey()); } - return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), true)); + return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), true, version)); }, service); } @@ -241,7 +241,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme rs -> rs.next() ? rs.getLong(1) : null, entityId.getId(), keyDictionaryDao.getOrSaveKeyId(query.getKey()))); isRemoved = true; if (query.getRewriteLatestIfDeleted()) { - return getNewLatestEntryFuture(tenantId, entityId, query); + return getNewLatestEntryFuture(tenantId, entityId, query, version); } } return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved, version)); From 19e9145183f72e3fd17c4d5c522e95506bcba2be Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 7 Apr 2025 09:55:23 +0300 Subject: [PATCH 2/3] added test --- .../timeseries/BaseTimeseriesServiceTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index 6a38159804..07df14ce03 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -60,6 +60,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -768,6 +769,27 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { testFindAllByQueriesWithAggregationAndInvalidInterval(-1); } + @Test + public void testRemoveLatestAndNoValuePresentInDB() throws ExecutionException, InterruptedException, TimeoutException { + TsKvEntry tsKvEntry = toTsEntry(TS, stringKvEntry); + tsService.save(tenantId, deviceId, tsKvEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + + Optional tsKvEntryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + + assertThat(tsKvEntryOpt).isPresent(); + equalsIgnoreVersion(tsKvEntry, tsKvEntryOpt.get()); + assertThat(tsKvEntryOpt.get().getVersion()).isNotNull(); + + tsService.removeLatest(tenantId, deviceId, List.of(STRING_KEY)); + + await().alias("Wait until ts last is removed from the cache").atMost(MAX_TIMEOUT, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + Optional tsKvEntryAfterRemoval = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(tsKvEntryAfterRemoval).isNotPresent(); + }); + } + private void testFindAllByQueriesWithAggregationAndInvalidInterval(long interval) { BaseReadTsKvQuery query = new BaseReadTsKvQuery(STRING_KEY, TS, TS, interval, 1000, Aggregation.SUM, "DESC"); Assert.assertThrows(IncorrectParameterException.class, () -> findAndVerifyQueryId(deviceId, query)); From f3f316471cb0cbaa54015f4de885909104a25cbf Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 7 Apr 2025 12:11:13 +0300 Subject: [PATCH 3/3] moved test --- .../timeseries/BaseTimeseriesServiceTest.java | 34 ++++--------------- .../sql/TimeseriesServiceSqlTest.java | 33 ++++++++++++++++++ 2 files changed, 39 insertions(+), 28 deletions(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index 07df14ce03..56b835c528 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -60,7 +60,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -82,21 +81,21 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { protected static final int MAX_TIMEOUT = 30; - private static final String STRING_KEY = "stringKey"; + protected static final String STRING_KEY = "stringKey"; private static final String LONG_KEY = "longKey"; private static final String DOUBLE_KEY = "doubleKey"; private static final String BOOLEAN_KEY = "booleanKey"; - private static final long TS = 42L; + protected static final long TS = 42L; private static final String DESC_ORDER = "DESC"; - KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value"); + protected KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value"); KvEntry longKvEntry = new LongDataEntry(LONG_KEY, Long.MAX_VALUE); KvEntry doubleKvEntry = new DoubleDataEntry(DOUBLE_KEY, Double.MAX_VALUE); KvEntry booleanKvEntry = new BooleanDataEntry(BOOLEAN_KEY, Boolean.TRUE); protected TenantId tenantId; - DeviceId deviceId = new DeviceId(Uuids.timeBased()); + protected DeviceId deviceId = new DeviceId(Uuids.timeBased()); @Before public void before() { @@ -769,27 +768,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { testFindAllByQueriesWithAggregationAndInvalidInterval(-1); } - @Test - public void testRemoveLatestAndNoValuePresentInDB() throws ExecutionException, InterruptedException, TimeoutException { - TsKvEntry tsKvEntry = toTsEntry(TS, stringKvEntry); - tsService.save(tenantId, deviceId, tsKvEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); - - Optional tsKvEntryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); - - assertThat(tsKvEntryOpt).isPresent(); - equalsIgnoreVersion(tsKvEntry, tsKvEntryOpt.get()); - assertThat(tsKvEntryOpt.get().getVersion()).isNotNull(); - - tsService.removeLatest(tenantId, deviceId, List.of(STRING_KEY)); - - await().alias("Wait until ts last is removed from the cache").atMost(MAX_TIMEOUT, TimeUnit.SECONDS) - .pollInterval(1, TimeUnit.SECONDS) - .untilAsserted(() -> { - Optional tsKvEntryAfterRemoval = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); - assertThat(tsKvEntryAfterRemoval).isNotPresent(); - }); - } - private void testFindAllByQueriesWithAggregationAndInvalidInterval(long interval) { BaseReadTsKvQuery query = new BaseReadTsKvQuery(STRING_KEY, TS, TS, interval, 1000, Aggregation.SUM, "DESC"); Assert.assertThrows(IncorrectParameterException.class, () -> findAndVerifyQueryId(deviceId, query)); @@ -833,11 +811,11 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { tsService.saveWithoutLatest(tenantId, deviceId, tsKvEntry, 0).get(MAX_TIMEOUT, TimeUnit.SECONDS); } - private static TsKvEntry toTsEntry(long ts, KvEntry entry) { + protected static TsKvEntry toTsEntry(long ts, KvEntry entry) { return new BasicTsKvEntry(ts, entry); } - private static void equalsIgnoreVersion(TsKvEntry expected, TsKvEntry actual) { + protected static void equalsIgnoreVersion(TsKvEntry expected, TsKvEntry actual) { assertEquals(expected.getKey(), actual.getKey()); assertEquals(expected.getValue(), actual.getValue()); assertEquals(expected.getTs(), actual.getTs()); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/sql/TimeseriesServiceSqlTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/sql/TimeseriesServiceSqlTest.java index 06a81adac3..7fa5b5f7e1 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/sql/TimeseriesServiceSqlTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/sql/TimeseriesServiceSqlTest.java @@ -15,9 +15,42 @@ */ package org.thingsboard.server.dao.service.timeseries.sql; +import org.junit.Test; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.dao.service.timeseries.BaseTimeseriesServiceTest; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + @DaoSqlTest public class TimeseriesServiceSqlTest extends BaseTimeseriesServiceTest { + + @Test + public void testRemoveLatestAndNoValuePresentInDB() throws ExecutionException, InterruptedException, TimeoutException { + TsKvEntry tsKvEntry = toTsEntry(TS, stringKvEntry); + tsService.save(tenantId, deviceId, tsKvEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + + Optional tsKvEntryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + + assertThat(tsKvEntryOpt).isPresent(); + equalsIgnoreVersion(tsKvEntry, tsKvEntryOpt.get()); + assertThat(tsKvEntryOpt.get().getVersion()).isNotNull(); + + tsService.removeLatest(tenantId, deviceId, List.of(STRING_KEY)); + + await().alias("Wait until ts last is removed from the cache").atMost(MAX_TIMEOUT, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + Optional tsKvEntryAfterRemoval = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(tsKvEntryAfterRemoval).isNotPresent(); + }); + } + }