Merge pull request #13107 from irynamatveieva/fix-ts-deletion
Fixed time series redis cache
This commit is contained in:
commit
40e64ef40b
@ -33,18 +33,14 @@ import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.stats.DefaultCounter;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.dao.cache.CacheExecutorService;
|
||||
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
|
||||
import org.thingsboard.server.dao.timeseries.TsLatestCacheKey;
|
||||
import org.thingsboard.server.dao.util.SqlTsLatestAnyDaoCachedRedis;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@Slf4j
|
||||
|
||||
@ -189,7 +189,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
|
||||
}
|
||||
|
||||
|
||||
private ListenableFuture<TsKvLatestRemovingResult> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
||||
private ListenableFuture<TsKvLatestRemovingResult> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query, Long version) {
|
||||
ListenableFuture<List<TsKvEntry>> future = findNewLatestEntryFuture(tenantId, entityId, query);
|
||||
return Futures.transformAsync(future, entryList -> {
|
||||
if (entryList.size() == 1) {
|
||||
@ -198,7 +198,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
|
||||
} else {
|
||||
log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
|
||||
}
|
||||
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), true));
|
||||
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), true, version));
|
||||
}, service);
|
||||
}
|
||||
|
||||
@ -241,7 +241,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
|
||||
rs -> rs.next() ? rs.getLong(1) : null, entityId.getId(), keyDictionaryDao.getOrSaveKeyId(query.getKey())));
|
||||
isRemoved = true;
|
||||
if (query.getRewriteLatestIfDeleted()) {
|
||||
return getNewLatestEntryFuture(tenantId, entityId, query);
|
||||
return getNewLatestEntryFuture(tenantId, entityId, query, version);
|
||||
}
|
||||
}
|
||||
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved, version));
|
||||
|
||||
@ -81,21 +81,21 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
|
||||
|
||||
protected static final int MAX_TIMEOUT = 30;
|
||||
|
||||
private static final String STRING_KEY = "stringKey";
|
||||
protected static final String STRING_KEY = "stringKey";
|
||||
private static final String LONG_KEY = "longKey";
|
||||
private static final String DOUBLE_KEY = "doubleKey";
|
||||
private static final String BOOLEAN_KEY = "booleanKey";
|
||||
|
||||
private static final long TS = 42L;
|
||||
protected static final long TS = 42L;
|
||||
private static final String DESC_ORDER = "DESC";
|
||||
|
||||
KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value");
|
||||
protected KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value");
|
||||
KvEntry longKvEntry = new LongDataEntry(LONG_KEY, Long.MAX_VALUE);
|
||||
KvEntry doubleKvEntry = new DoubleDataEntry(DOUBLE_KEY, Double.MAX_VALUE);
|
||||
KvEntry booleanKvEntry = new BooleanDataEntry(BOOLEAN_KEY, Boolean.TRUE);
|
||||
|
||||
protected TenantId tenantId;
|
||||
DeviceId deviceId = new DeviceId(Uuids.timeBased());
|
||||
protected DeviceId deviceId = new DeviceId(Uuids.timeBased());
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
@ -811,11 +811,11 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
|
||||
tsService.saveWithoutLatest(tenantId, deviceId, tsKvEntry, 0).get(MAX_TIMEOUT, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private static TsKvEntry toTsEntry(long ts, KvEntry entry) {
|
||||
protected static TsKvEntry toTsEntry(long ts, KvEntry entry) {
|
||||
return new BasicTsKvEntry(ts, entry);
|
||||
}
|
||||
|
||||
private static void equalsIgnoreVersion(TsKvEntry expected, TsKvEntry actual) {
|
||||
protected static void equalsIgnoreVersion(TsKvEntry expected, TsKvEntry actual) {
|
||||
assertEquals(expected.getKey(), actual.getKey());
|
||||
assertEquals(expected.getValue(), actual.getValue());
|
||||
assertEquals(expected.getTs(), actual.getTs());
|
||||
|
||||
@ -15,9 +15,42 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.service.timeseries.sql;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.dao.service.timeseries.BaseTimeseriesServiceTest;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
@DaoSqlTest
|
||||
public class TimeseriesServiceSqlTest extends BaseTimeseriesServiceTest {
|
||||
|
||||
@Test
|
||||
public void testRemoveLatestAndNoValuePresentInDB() throws ExecutionException, InterruptedException, TimeoutException {
|
||||
TsKvEntry tsKvEntry = toTsEntry(TS, stringKvEntry);
|
||||
tsService.save(tenantId, deviceId, tsKvEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS);
|
||||
|
||||
Optional<TsKvEntry> tsKvEntryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
|
||||
|
||||
assertThat(tsKvEntryOpt).isPresent();
|
||||
equalsIgnoreVersion(tsKvEntry, tsKvEntryOpt.get());
|
||||
assertThat(tsKvEntryOpt.get().getVersion()).isNotNull();
|
||||
|
||||
tsService.removeLatest(tenantId, deviceId, List.of(STRING_KEY));
|
||||
|
||||
await().alias("Wait until ts last is removed from the cache").atMost(MAX_TIMEOUT, TimeUnit.SECONDS)
|
||||
.pollInterval(1, TimeUnit.SECONDS)
|
||||
.untilAsserted(() -> {
|
||||
Optional<TsKvEntry> tsKvEntryAfterRemoval = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
|
||||
assertThat(tsKvEntryAfterRemoval).isNotPresent();
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user