TsLatestRedisCache: added LUA upsert script. load script, eval, evalsha, test for script sha, fetch latest by zRange Redis command
This commit is contained in:
		
							parent
							
								
									c49d97f7d1
								
							
						
					
					
						commit
						a4469f6953
					
				@ -51,6 +51,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Getter
 | 
					    @Getter
 | 
				
			||||||
    private final String cacheName;
 | 
					    private final String cacheName;
 | 
				
			||||||
 | 
					    @Getter
 | 
				
			||||||
    private final JedisConnectionFactory connectionFactory;
 | 
					    private final JedisConnectionFactory connectionFactory;
 | 
				
			||||||
    private final RedisSerializer<String> keySerializer = StringRedisSerializer.UTF_8;
 | 
					    private final RedisSerializer<String> keySerializer = StringRedisSerializer.UTF_8;
 | 
				
			||||||
    private final TbRedisSerializer<K, V> valueSerializer;
 | 
					    private final TbRedisSerializer<K, V> valueSerializer;
 | 
				
			||||||
 | 
				
			|||||||
@ -15,12 +15,15 @@
 | 
				
			|||||||
 */
 | 
					 */
 | 
				
			||||||
package org.thingsboard.server.dao.timeseries;
 | 
					package org.thingsboard.server.dao.timeseries;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import jakarta.annotation.PostConstruct;
 | 
				
			||||||
import lombok.extern.slf4j.Slf4j;
 | 
					import lombok.extern.slf4j.Slf4j;
 | 
				
			||||||
import org.apache.commons.lang3.NotImplementedException;
 | 
					import org.apache.commons.lang3.NotImplementedException;
 | 
				
			||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
					import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
				
			||||||
 | 
					import org.springframework.dao.InvalidDataAccessApiUsageException;
 | 
				
			||||||
import org.springframework.data.redis.connection.RedisConnection;
 | 
					import org.springframework.data.redis.connection.RedisConnection;
 | 
				
			||||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
 | 
					import org.springframework.data.redis.connection.RedisConnectionFactory;
 | 
				
			||||||
import org.springframework.data.redis.connection.RedisStringCommands;
 | 
					import org.springframework.data.redis.connection.ReturnType;
 | 
				
			||||||
 | 
					import org.springframework.data.redis.serializer.StringRedisSerializer;
 | 
				
			||||||
import org.springframework.stereotype.Service;
 | 
					import org.springframework.stereotype.Service;
 | 
				
			||||||
import org.thingsboard.server.cache.CacheSpecsMap;
 | 
					import org.thingsboard.server.cache.CacheSpecsMap;
 | 
				
			||||||
import org.thingsboard.server.cache.RedisTbTransactionalCache;
 | 
					import org.thingsboard.server.cache.RedisTbTransactionalCache;
 | 
				
			||||||
@ -32,33 +35,42 @@ import org.thingsboard.server.common.data.CacheConstants;
 | 
				
			|||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
					import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import java.io.Serializable;
 | 
					import java.io.Serializable;
 | 
				
			||||||
 | 
					import java.util.Arrays;
 | 
				
			||||||
import java.util.Collection;
 | 
					import java.util.Collection;
 | 
				
			||||||
import java.util.List;
 | 
					import java.util.List;
 | 
				
			||||||
 | 
					import java.util.Set;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
 | 
					@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
 | 
				
			||||||
@Service("TsLatestCache")
 | 
					@Service("TsLatestCache")
 | 
				
			||||||
@Slf4j
 | 
					@Slf4j
 | 
				
			||||||
public class TsLatestRedisCache<K extends Serializable, V extends Serializable> extends RedisTbTransactionalCache<TsLatestCacheKey, TsKvEntry> {
 | 
					public class TsLatestRedisCache<K extends Serializable, V extends Serializable> extends RedisTbTransactionalCache<TsLatestCacheKey, TsKvEntry> {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    static final byte[] UPSERT_TS_LATEST_LUA_SCRIPT = StringRedisSerializer.UTF_8.serialize("" +
 | 
				
			||||||
 | 
					            "redis.call('ZREMRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[1]); " +
 | 
				
			||||||
 | 
					            "redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2]); " +
 | 
				
			||||||
 | 
					            "local current_size = redis.call('ZCARD', KEYS[1]); " +
 | 
				
			||||||
 | 
					            "if current_size > 1 then" +
 | 
				
			||||||
 | 
					            "  redis.call('ZREMRANGEBYRANK', KEYS[1], 0, -2) " +
 | 
				
			||||||
 | 
					            "end;");
 | 
				
			||||||
 | 
					    static final byte[] UPSERT_TS_LATEST_SHA = StringRedisSerializer.UTF_8.serialize("24e226c3ea34e3e850113e8eb1f3cd2b88171988");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) {
 | 
					    public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) {
 | 
				
			||||||
        super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>());
 | 
					        super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @PostConstruct
 | 
				
			||||||
    public void put(TsLatestCacheKey key, TsKvEntry value) {
 | 
					    public void init() {
 | 
				
			||||||
        log.trace("put [{}][{}]", key, value);
 | 
					        try (var connection = getConnection(UPSERT_TS_LATEST_SHA)) {
 | 
				
			||||||
        final byte[] rawKey = getRawKey(key);
 | 
					            log.debug("Loading LUA with expected SHA[{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), connection.getNativeConnection());
 | 
				
			||||||
        try (var connection = getConnection(rawKey)) {
 | 
					            String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT);
 | 
				
			||||||
            put(connection, rawKey, value, RedisStringCommands.SetOption.UPSERT);
 | 
					            if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) {
 | 
				
			||||||
 | 
					                log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), sha, connection.getNativeConnection());
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        } catch (Throwable t) {
 | 
				
			||||||
 | 
					            log.error("Error on Redis TS Latest cache init", t);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					 | 
				
			||||||
    public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) {
 | 
					 | 
				
			||||||
        log.trace("putIfAbsent [{}][{}]", key, value);
 | 
					 | 
				
			||||||
        throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache");
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public TbCacheValueWrapper<TsKvEntry> get(TsLatestCacheKey key) {
 | 
					    public TbCacheValueWrapper<TsKvEntry> get(TsLatestCacheKey key) {
 | 
				
			||||||
        log.debug("get [{}]", key);
 | 
					        log.debug("get [{}]", key);
 | 
				
			||||||
@ -68,11 +80,38 @@ public class TsLatestRedisCache<K extends Serializable, V extends Serializable>
 | 
				
			|||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    protected byte[] doGet(RedisConnection connection, byte[] rawKey) {
 | 
					    protected byte[] doGet(RedisConnection connection, byte[] rawKey) {
 | 
				
			||||||
        log.trace("doGet [{}][{}]", connection, rawKey);
 | 
					        log.trace("doGet [{}][{}]", connection, rawKey);
 | 
				
			||||||
        return connection.stringCommands().get(rawKey);
 | 
					        Set<byte[]> values = connection.commands().zRange(rawKey, -1, -1);
 | 
				
			||||||
 | 
					        return values == null ? null : values.stream().findFirst().orElse(null);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void evict(TsLatestCacheKey key){
 | 
					    public void put(TsLatestCacheKey key, TsKvEntry value) {
 | 
				
			||||||
 | 
					        log.trace("put [{}][{}]", key, value);
 | 
				
			||||||
 | 
					        final byte[] rawKey = getRawKey(key);
 | 
				
			||||||
 | 
					        try (var connection = getConnection(rawKey)) {
 | 
				
			||||||
 | 
					            byte[] rawValue = getRawValue(value);
 | 
				
			||||||
 | 
					            byte[] ts = StringRedisSerializer.UTF_8.serialize(String.valueOf(value.toTsValue().getTs()));
 | 
				
			||||||
 | 
					            try {
 | 
				
			||||||
 | 
					                connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue);
 | 
				
			||||||
 | 
					            } catch (InvalidDataAccessApiUsageException e) {
 | 
				
			||||||
 | 
					                log.debug("loading LUA [{}]", connection.getNativeConnection());
 | 
				
			||||||
 | 
					                String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT);
 | 
				
			||||||
 | 
					                if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) {
 | 
				
			||||||
 | 
					                    log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(UPSERT_TS_LATEST_SHA), sha);
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					                try {
 | 
				
			||||||
 | 
					                    connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue);
 | 
				
			||||||
 | 
					                } catch (InvalidDataAccessApiUsageException ignored) {
 | 
				
			||||||
 | 
					                    log.debug("Slowly executing eval instead of fast evalsha");
 | 
				
			||||||
 | 
					                    connection.scriptingCommands().eval(UPSERT_TS_LATEST_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, ts, rawValue);
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public void evict(TsLatestCacheKey key) {
 | 
				
			||||||
        log.trace("evict [{}]", key);
 | 
					        log.trace("evict [{}]", key);
 | 
				
			||||||
        final byte[] rawKey = getRawKey(key);
 | 
					        final byte[] rawKey = getRawKey(key);
 | 
				
			||||||
        try (var connection = getConnection(rawKey)) {
 | 
					        try (var connection = getConnection(rawKey)) {
 | 
				
			||||||
@ -80,6 +119,12 @@ public class TsLatestRedisCache<K extends Serializable, V extends Serializable>
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) {
 | 
				
			||||||
 | 
					        log.trace("putIfAbsent [{}][{}]", key, value);
 | 
				
			||||||
 | 
					        throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache");
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void evict(Collection<TsLatestCacheKey> keys) {
 | 
					    public void evict(Collection<TsLatestCacheKey> keys) {
 | 
				
			||||||
        throw new NotImplementedException("evict by many keys is not supported by TsLatestRedisCache");
 | 
					        throw new NotImplementedException("evict by many keys is not supported by TsLatestRedisCache");
 | 
				
			||||||
 | 
				
			|||||||
@ -0,0 +1,30 @@
 | 
				
			|||||||
 | 
					package org.thingsboard.server.dao.timeseries;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import lombok.SneakyThrows;
 | 
				
			||||||
 | 
					import org.junit.jupiter.api.Test;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.security.MessageDigest;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import static org.assertj.core.api.Assertions.assertThat;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class TsLatestRedisCacheTest {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    void testUpsertTsLatestLUAScriptHash() {
 | 
				
			||||||
 | 
					        assertThat(getSHA1(TsLatestRedisCache.UPSERT_TS_LATEST_LUA_SCRIPT)).isEqualTo(new String(TsLatestRedisCache.UPSERT_TS_LATEST_SHA));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @SneakyThrows
 | 
				
			||||||
 | 
					    String getSHA1(byte[] script) {
 | 
				
			||||||
 | 
					        MessageDigest md = MessageDigest.getInstance("SHA-1");
 | 
				
			||||||
 | 
					        byte[] hash = md.digest(script);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        StringBuilder sb = new StringBuilder();
 | 
				
			||||||
 | 
					        for (byte b : hash) {
 | 
				
			||||||
 | 
					            sb.append(String.format("%02x", b));
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return sb.toString();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user