RedisTbTransactionalCache - redis cluster watch support by a single key

This commit is contained in:
Sergey Matvienko 2022-06-17 19:15:44 +03:00 committed by Andrii Shvaika
parent 9a77fb8abd
commit 0141787d08

View File

@ -21,20 +21,27 @@ import org.springframework.cache.support.NullValue;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.jedis.JedisClusterConnection;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.util.JedisClusterCRC16;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class RedisTbTransactionalCache<K extends Serializable, V extends Serializable> implements TbTransactionalCache<K, V> {
private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE);
static final JedisPool MOCK_POOL = new JedisPool(); //non-null pool required for JedisConnection to trigger closing jedis connection
@Getter
private final String cacheName;
@ -53,12 +60,12 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
this.connectionFactory = connectionFactory;
this.valueSerializer = valueSerializer;
this.evictExpiration = Expiration.from(configuration.getEvictTtlInMs(), TimeUnit.MILLISECONDS);
if (cacheSpecsMap.getSpecs() != null && cacheSpecsMap.getSpecs().get(cacheName) != null) {
CacheSpecs cacheSpecs = cacheSpecsMap.getSpecs().get(cacheName);
this.cacheTtl = Expiration.from(cacheSpecs.getTimeToLiveInMinutes(), TimeUnit.MINUTES);
} else {
this.cacheTtl = Expiration.persistent();
}
this.cacheTtl = Optional.ofNullable(cacheSpecsMap)
.map(CacheSpecsMap::getSpecs)
.map(x -> x.get(cacheName))
.map(CacheSpecs::getTimeToLiveInMinutes)
.map(t -> Expiration.from(t, TimeUnit.MINUTES))
.orElseGet(Expiration::persistent);
}
@Override
@ -130,8 +137,24 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
return new RedisTbCacheTransaction<>(this, connection);
}
RedisConnection getConnection(byte[] rawKey) {
RedisConnection connection = connectionFactory.getClusterConnection();
if (!(connection instanceof JedisClusterConnection)) {
return connection;
}
int slotNum = JedisClusterCRC16.getSlot(rawKey);
Jedis jedis = ((JedisClusterConnection) connection).getNativeConnection().getConnectionFromSlot(slotNum);
JedisConnection jedisConnection = new JedisConnection(jedis, MOCK_POOL, jedis.getDB());
jedisConnection.setConvertPipelineAndTxResults(connectionFactory.getConvertPipelineAndTxResults());
return jedisConnection;
}
private RedisConnection watch(byte[][] rawKeysList) {
var connection = connectionFactory.getConnection();
//TODO process keys only on suitable slot connection, see getConnection(byte[] rawKey)
RedisConnection connection = getConnection(rawKeysList[0]);
try {
connection.watch(rawKeysList);
connection.multi();