diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java index 35ef07f0ce..4277a7bab9 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java @@ -51,6 +51,7 @@ public abstract class RedisTbTransactionalCache keySerializer = StringRedisSerializer.UTF_8; private final TbRedisSerializer valueSerializer; diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java index 61d2c8bdb2..e9f9ab522c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java @@ -15,12 +15,15 @@ */ package org.thingsboard.server.dao.timeseries; +import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.NotImplementedException; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.RedisStringCommands; +import org.springframework.data.redis.connection.ReturnType; +import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.stereotype.Service; import org.thingsboard.server.cache.CacheSpecsMap; import org.thingsboard.server.cache.RedisTbTransactionalCache; @@ -32,33 +35,42 @@ import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.io.Serializable; +import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Set; @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") @Service("TsLatestCache") @Slf4j public class TsLatestRedisCache extends RedisTbTransactionalCache { + static final byte[] UPSERT_TS_LATEST_LUA_SCRIPT = StringRedisSerializer.UTF_8.serialize("" + + "redis.call('ZREMRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[1]); " + + "redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2]); " + + "local current_size = redis.call('ZCARD', KEYS[1]); " + + "if current_size > 1 then" + + " redis.call('ZREMRANGEBYRANK', KEYS[1], 0, -2) " + + "end;"); + static final byte[] UPSERT_TS_LATEST_SHA = StringRedisSerializer.UTF_8.serialize("24e226c3ea34e3e850113e8eb1f3cd2b88171988"); + public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>()); } - @Override - public void put(TsLatestCacheKey key, TsKvEntry value) { - log.trace("put [{}][{}]", key, value); - final byte[] rawKey = getRawKey(key); - try (var connection = getConnection(rawKey)) { - put(connection, rawKey, value, RedisStringCommands.SetOption.UPSERT); + @PostConstruct + public void init() { + try (var connection = getConnection(UPSERT_TS_LATEST_SHA)) { + log.debug("Loading LUA with expected SHA[{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), connection.getNativeConnection()); + String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT); + if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) { + log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), sha, connection.getNativeConnection()); + } + } catch (Throwable t) { + log.error("Error on Redis TS Latest cache init", t); } } - @Override - public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) { - log.trace("putIfAbsent [{}][{}]", key, value); - throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache"); - } - @Override public TbCacheValueWrapper get(TsLatestCacheKey key) { log.debug("get [{}]", key); @@ -68,11 +80,38 @@ public class TsLatestRedisCache @Override protected byte[] doGet(RedisConnection connection, byte[] rawKey) { log.trace("doGet [{}][{}]", connection, rawKey); - return connection.stringCommands().get(rawKey); + Set values = connection.commands().zRange(rawKey, -1, -1); + return values == null ? null : values.stream().findFirst().orElse(null); } @Override - public void evict(TsLatestCacheKey key){ + public void put(TsLatestCacheKey key, TsKvEntry value) { + log.trace("put [{}][{}]", key, value); + final byte[] rawKey = getRawKey(key); + try (var connection = getConnection(rawKey)) { + byte[] rawValue = getRawValue(value); + byte[] ts = StringRedisSerializer.UTF_8.serialize(String.valueOf(value.toTsValue().getTs())); + try { + connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue); + } catch (InvalidDataAccessApiUsageException e) { + log.debug("loading LUA [{}]", connection.getNativeConnection()); + String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT); + if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) { + log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(UPSERT_TS_LATEST_SHA), sha); + } + try { + connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue); + } catch (InvalidDataAccessApiUsageException ignored) { + log.debug("Slowly executing eval instead of fast evalsha"); + connection.scriptingCommands().eval(UPSERT_TS_LATEST_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, ts, rawValue); + } + + } + } + } + + @Override + public void evict(TsLatestCacheKey key) { log.trace("evict [{}]", key); final byte[] rawKey = getRawKey(key); try (var connection = getConnection(rawKey)) { @@ -80,6 +119,12 @@ public class TsLatestRedisCache } } + @Override + public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) { + log.trace("putIfAbsent [{}][{}]", key, value); + throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache"); + } + @Override public void evict(Collection keys) { throw new NotImplementedException("evict by many keys is not supported by TsLatestRedisCache"); diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java new file mode 100644 index 0000000000..5de26c15d3 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java @@ -0,0 +1,30 @@ +package org.thingsboard.server.dao.timeseries; + +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; + +import java.security.MessageDigest; + +import static org.assertj.core.api.Assertions.assertThat; + +class TsLatestRedisCacheTest { + + @Test + void testUpsertTsLatestLUAScriptHash() { + assertThat(getSHA1(TsLatestRedisCache.UPSERT_TS_LATEST_LUA_SCRIPT)).isEqualTo(new String(TsLatestRedisCache.UPSERT_TS_LATEST_SHA)); + } + + @SneakyThrows + String getSHA1(byte[] script) { + MessageDigest md = MessageDigest.getInstance("SHA-1"); + byte[] hash = md.digest(script); + + StringBuilder sb = new StringBuilder(); + for (byte b : hash) { + sb.append(String.format("%02x", b)); + } + + return sb.toString(); + } + +} \ No newline at end of file