From 0b51baf38de1439224ecab046e8de938053e62de Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 3 Jul 2024 16:03:19 +0200 Subject: [PATCH] LUA script improvements and used versioned cache for latest ts --- .../cache/RedisTbTransactionalCache.java | 2 +- .../server/cache/VersionedRedisTbCache.java | 16 ++- .../server/cache}/TsLatestRedisCacheTest.java | 6 +- .../server/common/data/HasVersion.java | 2 +- .../common/data/kv/BaseAttributeKvEntry.java | 2 +- .../server/common/data/kv/BasicTsKvEntry.java | 13 ++ .../server/common/data/kv/TsKvEntry.java | 3 +- .../data/kv/TsKvLatestRemovingResult.java | 11 +- .../dao/model/sql/AbstractTsKvEntity.java | 7 +- .../model/sqlts/latest/TsKvLatestEntity.java | 11 +- ...paAbstractDaoListeningExecutorService.java | 4 + .../dao/sql/attributes/JpaAttributeDao.java | 5 +- .../CachedRedisSqlTimeseriesLatestDao.java | 51 +++---- .../dao/sqlts/SqlTimeseriesLatestDao.java | 12 +- .../latest/SearchTsKvLatestRepository.java | 2 +- .../CassandraBaseTimeseriesLatestDao.java | 2 +- .../dao/timeseries/TsLatestRedisCache.java | 127 +++--------------- .../attributes/BaseAttributesServiceTest.java | 3 - 18 files changed, 106 insertions(+), 173 deletions(-) rename {dao/src/test/java/org/thingsboard/server/dao/timeseries => common/cache/src/test/java/org/thingsboard/server/cache}/TsLatestRedisCacheTest.java (83%) diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java index dd371d96b6..9e51795be6 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java @@ -80,7 +80,7 @@ public abstract class RedisTbTransactionalCacheI8", newVersion) .. newValue:sub(9) + local newValueWithVersion = struct.pack(">I8", newVersion) .. newValue redis.call('SET', key, newValueWithVersion, 'EX', expiration) end + + local function bytes_to_number(bytes) + local n = 0 + for i = 1, 8 do + n = n * 256 + string.byte(bytes, i) + end + return n + end -- Get the current version (first 8 bytes) of the current value local currentVersionBytes = redis.call('GETRANGE', key, 0, 7) if currentVersionBytes and #currentVersionBytes == 8 then - local currentVersion = tonumber(struct.unpack(">I8", currentVersionBytes)) + local currentVersion = bytes_to_number(currentVersionBytes) - if newVersion > currentVersion then + if newVersion > currentVersion or newVersion == 1 and currentVersion > 1 then setNewValue() end else @@ -62,7 +70,7 @@ public abstract class VersionedRedisTbCache valueSerializer) { super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer); diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java b/common/cache/src/test/java/org/thingsboard/server/cache/TsLatestRedisCacheTest.java similarity index 83% rename from dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java rename to common/cache/src/test/java/org/thingsboard/server/cache/TsLatestRedisCacheTest.java index c463b4630f..d0f3042c61 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java +++ b/common/cache/src/test/java/org/thingsboard/server/cache/TsLatestRedisCacheTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.timeseries; +package org.thingsboard.server.cache; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; @@ -22,11 +22,11 @@ import java.security.MessageDigest; import static org.assertj.core.api.Assertions.assertThat; -class TsLatestRedisCacheTest { +class VersionedRedisTbCacheTest { @Test void testUpsertTsLatestLUAScriptHash() { - assertThat(getSHA1(TsLatestRedisCache.UPSERT_TS_LATEST_LUA_SCRIPT)).isEqualTo(new String(TsLatestRedisCache.UPSERT_TS_LATEST_SHA)); + assertThat(getSHA1(VersionedRedisTbCache.SET_VERSIONED_VALUE_LUA_SCRIPT)).isEqualTo(new String(VersionedRedisTbCache.SET_VERSIONED_VALUE_SHA)); } @SneakyThrows diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/HasVersion.java b/common/data/src/main/java/org/thingsboard/server/common/data/HasVersion.java index fbeb8ef3b6..b9af6f93c9 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/HasVersion.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/HasVersion.java @@ -16,5 +16,5 @@ package org.thingsboard.server.common.data; public interface HasVersion { - long getVersion(); + Long getVersion(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseAttributeKvEntry.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseAttributeKvEntry.java index 6d48693511..001e061aa9 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseAttributeKvEntry.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseAttributeKvEntry.java @@ -98,7 +98,7 @@ public class BaseAttributeKvEntry implements AttributeKvEntry { } @Override - public long getVersion() { + public Long getVersion() { return version; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BasicTsKvEntry.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BasicTsKvEntry.java index 396d1df5ba..ab39498ae0 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BasicTsKvEntry.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BasicTsKvEntry.java @@ -25,9 +25,18 @@ public class BasicTsKvEntry implements TsKvEntry { @Valid private final KvEntry kv; + private final Long version; + public BasicTsKvEntry(long ts, KvEntry kv) { this.ts = ts; this.kv = kv; + this.version = null; + } + + public BasicTsKvEntry(long ts, KvEntry kv, Long version) { + this.ts = ts; + this.kv = kv; + this.version = version; } @Override @@ -118,4 +127,8 @@ public class BasicTsKvEntry implements TsKvEntry { return Math.max(1, (length + MAX_CHARS_PER_DATA_POINT - 1) / MAX_CHARS_PER_DATA_POINT); } + @Override + public Long getVersion() { + return version; + } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvEntry.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvEntry.java index c65f48e550..4b8887e893 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvEntry.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvEntry.java @@ -16,6 +16,7 @@ package org.thingsboard.server.common.data.kv; import com.fasterxml.jackson.annotation.JsonIgnore; +import org.thingsboard.server.common.data.HasVersion; import org.thingsboard.server.common.data.query.TsValue; /** @@ -24,7 +25,7 @@ import org.thingsboard.server.common.data.query.TsValue; * @author ashvayka * */ -public interface TsKvEntry extends KvEntry { +public interface TsKvEntry extends KvEntry, HasVersion { long getTs(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvLatestRemovingResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvLatestRemovingResult.java index acdb745ad8..dae91bdd81 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvLatestRemovingResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvLatestRemovingResult.java @@ -22,15 +22,22 @@ public class TsKvLatestRemovingResult { private String key; private TsKvEntry data; private boolean removed; + private Long version; - public TsKvLatestRemovingResult(TsKvEntry data) { + public TsKvLatestRemovingResult(String key, boolean removed) { + this(key, removed, null); + } + + public TsKvLatestRemovingResult(TsKvEntry data, Long version) { this.key = data.getKey(); this.data = data; this.removed = true; + this.version = version; } - public TsKvLatestRemovingResult(String key, boolean removed) { + public TsKvLatestRemovingResult(String key, boolean removed, Long version) { this.key = key; this.removed = removed; + this.version = version; } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbstractTsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbstractTsKvEntity.java index a0a4664829..ae4f47a37a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbstractTsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbstractTsKvEntity.java @@ -119,10 +119,13 @@ public abstract class AbstractTsKvEntity implements ToData { } if (aggValuesCount == null) { - return new BasicTsKvEntry(ts, kvEntry); + return new BasicTsKvEntry(ts, kvEntry, getVersion()); } else { return new AggTsKvEntry(ts, kvEntry, aggValuesCount); } } -} \ No newline at end of file + public Long getVersion() { + return null; + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java index 0d34bb5195..23fe580fa3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.model.sqlts.latest; +import jakarta.persistence.Column; import jakarta.persistence.ColumnResult; import jakarta.persistence.ConstructorResult; import jakarta.persistence.Entity; @@ -30,6 +31,8 @@ import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository; import java.util.UUID; +import static org.thingsboard.server.dao.model.ModelConstants.VERSION_COLUMN; + @Data @Entity @Table(name = "ts_kv_latest") @@ -50,7 +53,7 @@ import java.util.UUID; @ColumnResult(name = "doubleValue", type = Double.class), @ColumnResult(name = "jsonValue", type = String.class), @ColumnResult(name = "ts", type = Long.class), - + @ColumnResult(name = "version", type = Long.class) } ), }) @@ -65,6 +68,9 @@ import java.util.UUID; }) public final class TsKvLatestEntity extends AbstractTsKvEntity { + @Column(name = VERSION_COLUMN) + private Long version; + @Override public boolean isNotEmpty() { return strValue != null || longValue != null || doubleValue != null || booleanValue != null || jsonValue != null; @@ -73,7 +79,7 @@ public final class TsKvLatestEntity extends AbstractTsKvEntity { public TsKvLatestEntity() { } - public TsKvLatestEntity(UUID entityId, Integer key, String strKey, String strValue, Boolean boolValue, Long longValue, Double doubleValue, String jsonValue, Long ts) { + public TsKvLatestEntity(UUID entityId, Integer key, String strKey, String strValue, Boolean boolValue, Long longValue, Double doubleValue, String jsonValue, Long ts, Long version) { this.entityId = entityId; this.key = key; this.ts = ts; @@ -83,5 +89,6 @@ public final class TsKvLatestEntity extends AbstractTsKvEntity { this.booleanValue = boolValue; this.jsonValue = jsonValue; this.strKey = strKey; + this.version = version; } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java index b85e1bc724..3ac31bf831 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.sql; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.support.TransactionTemplate; import javax.sql.DataSource; import java.sql.SQLException; @@ -36,6 +37,9 @@ public abstract class JpaAbstractDaoListeningExecutorService { @Autowired protected JdbcTemplate jdbcTemplate; + @Autowired + protected TransactionTemplate transactionTemplate; + protected void printWarnings(Statement statement) throws SQLException { SQLWarning warnings = statement.getWarnings(); if (warnings != null) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java index 172940d21d..a0e1f1447d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java @@ -206,15 +206,14 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl return futuresList; } - @Transactional @Override public List>> removeAllWithVersions(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List keys) { List>> futuresList = new ArrayList<>(keys.size()); for (String key : keys) { futuresList.add(service.submit(() -> { - Long version = jdbcTemplate.query("DELETE FROM attribute_kv WHERE entity_id = ? AND attribute_type = ? " + + Long version = transactionTemplate.execute(status -> jdbcTemplate.query("DELETE FROM attribute_kv WHERE entity_id = ? AND attribute_type = ? " + "AND attribute_key = ? RETURNING nextval('attribute_kv_version_seq')", - rs -> rs.next() ? rs.getLong(1) : null, entityId.getId(), attributeScope.getId(), keyDictionaryDao.getOrSaveKeyId(key)); + rs -> rs.next() ? rs.getLong(1) : null, entityId.getId(), attributeScope.getId(), keyDictionaryDao.getOrSaveKeyId(key))); return TbPair.of(key, version); })); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java index 32d0ee91c9..42dc149342 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java @@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; import org.thingsboard.server.cache.TbCacheValueWrapper; -import org.thingsboard.server.cache.TbTransactionalCache; +import org.thingsboard.server.cache.VersionedTbCache; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -52,7 +52,7 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries final CacheExecutorService cacheExecutorService; final SqlTimeseriesLatestDao sqlDao; final StatsFactory statsFactory; - final TbTransactionalCache cache; + final VersionedTbCache cache; DefaultCounter hitCounter; DefaultCounter missCounter; @@ -64,17 +64,17 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries } @Override - public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { - ListenableFuture future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry); + public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { + ListenableFuture future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry); future = Futures.transform(future, x -> { - cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), tsKvEntry); + cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), tsKvEntry, x); return x; }, cacheExecutorService); if (log.isTraceEnabled()) { Futures.addCallback(future, new FutureCallback<>() { @Override - public void onSuccess(Void result) { + public void onSuccess(Long result) { log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry); } @@ -91,7 +91,15 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { ListenableFuture future = sqlDao.removeLatest(tenantId, entityId, query); future = Futures.transform(future, x -> { - cache.evict(new TsLatestCacheKey(entityId, query.getKey())); + if (x.isRemoved()) { + TsLatestCacheKey key = new TsLatestCacheKey(entityId, query.getKey()); + Long version = x.getVersion(); + if (x.getData() != null) { + cache.put(key, x.getData(), version); + } else { + cache.evict(key, version); + } + } return x; }, cacheExecutorService); @@ -133,32 +141,11 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries return Futures.immediateFuture(Optional.ofNullable(tsKvEntry)); } log.debug("findLatest cache miss [{}][{}]", entityId, key); - ListenableFuture> daoFuture = sqlDao.findLatestOpt(tenantId,entityId, key); + ListenableFuture> daoFuture = sqlDao.findLatestOpt(tenantId, entityId, key); - return Futures.transformAsync(daoFuture, (daoValue) -> { - - if (daoValue.isEmpty()) { - //TODO implement the cache logic if no latest found in TS DAO. Currently we are always getting from DB to stay on the safe side - return Futures.immediateFuture(daoValue); - } - ListenableFuture> cachePutFuture = cacheExecutorService.submit(() -> { - cache.put(new TsLatestCacheKey(entityId, key), daoValue.get()); - return daoValue; - }); - - Futures.addCallback(cachePutFuture, new FutureCallback<>() { - @Override - public void onSuccess(Optional result) { - log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, key, result); - } - - @Override - public void onFailure(Throwable t) { - log.info("saveLatest onFailure [{}][{}][{}]", entityId, key, daoValue, t); - } - - }, MoreExecutors.directExecutor()); - return cachePutFuture; + return Futures.transform(daoFuture, daoValue -> { + cache.put(cacheKey, daoValue.orElse(null)); + return daoValue; }, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor()); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index edb0e535d1..44859c26e7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -190,7 +190,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return Futures.transformAsync(future, entryList -> { if (entryList.size() == 1) { TsKvEntry entry = entryList.get(0); - return Futures.transform(getSaveLatestFuture(entityId, entry), v -> new TsKvLatestRemovingResult(entry), MoreExecutors.directExecutor()); + return Futures.transform(getSaveLatestFuture(entityId, entry), v -> new TsKvLatestRemovingResult(entry, v), MoreExecutors.directExecutor()); } else { log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey()); } @@ -229,18 +229,18 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false)); } boolean isRemoved = false; + Long version = null; long ts = latest.getTs(); if (ts >= query.getStartTs() && ts < query.getEndTs()) { - TsKvLatestEntity latestEntity = new TsKvLatestEntity(); - latestEntity.setEntityId(entityId.getId()); - latestEntity.setKey(keyDictionaryDao.getOrSaveKeyId(query.getKey())); - tsKvLatestRepository.delete(latestEntity); + version = transactionTemplate.execute(status -> jdbcTemplate.query("DELETE FROM ts_kv_latest WHERE entity_id = ? " + + "AND key = ? RETURNING nextval('ts_kv_latest_version_seq')", + rs -> rs.next() ? rs.getLong(1) : null, entityId.getId(), keyDictionaryDao.getOrSaveKeyId(query.getKey()))); isRemoved = true; if (query.getRewriteLatestIfDeleted()) { return getNewLatestEntryFuture(tenantId, entityId, query); } } - return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved)); + return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved, version)); }, MoreExecutors.directExecutor()); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/SearchTsKvLatestRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/SearchTsKvLatestRepository.java index 50652d4f60..5e9e3369a5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/SearchTsKvLatestRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/SearchTsKvLatestRepository.java @@ -31,7 +31,7 @@ public class SearchTsKvLatestRepository { public static final String FIND_ALL_BY_ENTITY_ID = "findAllByEntityId"; public static final String FIND_ALL_BY_ENTITY_ID_QUERY = "SELECT ts_kv_latest.entity_id AS entityId, ts_kv_latest.key AS key, key_dictionary.key AS strKey, ts_kv_latest.str_v AS strValue," + - " ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue, ts_kv_latest.ts AS ts FROM ts_kv_latest " + + " ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue, ts_kv_latest.ts AS ts, ts_kv_latest.version AS version FROM ts_kv_latest " + "INNER JOIN key_dictionary ON ts_kv_latest.key = key_dictionary.key_id WHERE ts_kv_latest.entity_id = cast(:id AS uuid)"; @PersistenceContext diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java index f774f18dc7..01c91d4801 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java @@ -161,7 +161,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes var entryList = result.getData(); if (entryList.size() == 1) { TsKvEntry entry = entryList.get(0); - return Futures.transform(saveLatest(tenantId, entityId, entryList.get(0)), v -> new TsKvLatestRemovingResult(entry), MoreExecutors.directExecutor()); + return Futures.transform(saveLatest(tenantId, entityId, entryList.get(0)), v -> new TsKvLatestRemovingResult(entry, v), MoreExecutors.directExecutor()); } else { log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey()); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java index e9f9ab522c..851d66ae0c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java @@ -15,134 +15,41 @@ */ package org.thingsboard.server.dao.timeseries; -import jakarta.annotation.PostConstruct; +import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.NotImplementedException; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.dao.InvalidDataAccessApiUsageException; -import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.ReturnType; -import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.springframework.data.redis.serializer.SerializationException; import org.springframework.stereotype.Service; import org.thingsboard.server.cache.CacheSpecsMap; -import org.thingsboard.server.cache.RedisTbTransactionalCache; import org.thingsboard.server.cache.TBRedisCacheConfiguration; -import org.thingsboard.server.cache.TbCacheTransaction; -import org.thingsboard.server.cache.TbCacheValueWrapper; -import org.thingsboard.server.cache.TbJavaRedisSerializer; +import org.thingsboard.server.cache.TbRedisSerializer; +import org.thingsboard.server.cache.VersionedRedisTbCache; import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.kv.TsKvEntry; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Set; +import org.thingsboard.server.common.util.KvProtoUtil; +import org.thingsboard.server.gen.transport.TransportProtos; @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") @Service("TsLatestCache") @Slf4j -public class TsLatestRedisCache extends RedisTbTransactionalCache { - - static final byte[] UPSERT_TS_LATEST_LUA_SCRIPT = StringRedisSerializer.UTF_8.serialize("" + - "redis.call('ZREMRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[1]); " + - "redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2]); " + - "local current_size = redis.call('ZCARD', KEYS[1]); " + - "if current_size > 1 then" + - " redis.call('ZREMRANGEBYRANK', KEYS[1], 0, -2) " + - "end;"); - static final byte[] UPSERT_TS_LATEST_SHA = StringRedisSerializer.UTF_8.serialize("24e226c3ea34e3e850113e8eb1f3cd2b88171988"); +public class TsLatestRedisCache extends VersionedRedisTbCache { public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { - super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>()); - } - - @PostConstruct - public void init() { - try (var connection = getConnection(UPSERT_TS_LATEST_SHA)) { - log.debug("Loading LUA with expected SHA[{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), connection.getNativeConnection()); - String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT); - if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) { - log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), sha, connection.getNativeConnection()); + super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() { + @Override + public byte[] serialize(TsKvEntry tsKvEntry) throws SerializationException { + return KvProtoUtil.toTsKvProto(tsKvEntry.getTs(), tsKvEntry).toByteArray(); } - } catch (Throwable t) { - log.error("Error on Redis TS Latest cache init", t); - } - } - @Override - public TbCacheValueWrapper get(TsLatestCacheKey key) { - log.debug("get [{}]", key); - return super.get(key); - } - - @Override - protected byte[] doGet(RedisConnection connection, byte[] rawKey) { - log.trace("doGet [{}][{}]", connection, rawKey); - Set values = connection.commands().zRange(rawKey, -1, -1); - return values == null ? null : values.stream().findFirst().orElse(null); - } - - @Override - public void put(TsLatestCacheKey key, TsKvEntry value) { - log.trace("put [{}][{}]", key, value); - final byte[] rawKey = getRawKey(key); - try (var connection = getConnection(rawKey)) { - byte[] rawValue = getRawValue(value); - byte[] ts = StringRedisSerializer.UTF_8.serialize(String.valueOf(value.toTsValue().getTs())); - try { - connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue); - } catch (InvalidDataAccessApiUsageException e) { - log.debug("loading LUA [{}]", connection.getNativeConnection()); - String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT); - if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) { - log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(UPSERT_TS_LATEST_SHA), sha); - } + @Override + public TsKvEntry deserialize(TsLatestCacheKey key, byte[] bytes) throws SerializationException { try { - connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue); - } catch (InvalidDataAccessApiUsageException ignored) { - log.debug("Slowly executing eval instead of fast evalsha"); - connection.scriptingCommands().eval(UPSERT_TS_LATEST_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, ts, rawValue); + return KvProtoUtil.fromTsKvProto(TransportProtos.TsKvProto.parseFrom(bytes)); + } catch (InvalidProtocolBufferException e) { + throw new SerializationException(e.getMessage()); } - } - } + }); } - - @Override - public void evict(TsLatestCacheKey key) { - log.trace("evict [{}]", key); - final byte[] rawKey = getRawKey(key); - try (var connection = getConnection(rawKey)) { - connection.keyCommands().del(rawKey); - } - } - - @Override - public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) { - log.trace("putIfAbsent [{}][{}]", key, value); - throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache"); - } - - @Override - public void evict(Collection keys) { - throw new NotImplementedException("evict by many keys is not supported by TsLatestRedisCache"); - } - - @Override - public void evictOrPut(TsLatestCacheKey key, TsKvEntry value) { - throw new NotImplementedException("evictOrPut is not supported by TsLatestRedisCache"); - } - - @Override - public TbCacheTransaction newTransactionForKey(TsLatestCacheKey key) { - throw new NotImplementedException("newTransactionForKey is not supported by TsLatestRedisCache"); - } - - @Override - public TbCacheTransaction newTransactionForKeys(List keys) { - throw new NotImplementedException("newTransactionForKeys is not supported by TsLatestRedisCache"); - } - } diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java index ad2470a9b1..0603f4e3f6 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java @@ -59,9 +59,6 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest { private static final String OLD_VALUE = "OLD VALUE"; private static final String NEW_VALUE = "NEW VALUE"; - @Autowired - private VersionedTbCache cache; - @Autowired private AttributesService attributesService;