From 0141787d0895543096c93deec60cd5bdecf21db2 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 17 Jun 2022 19:15:44 +0300 Subject: [PATCH] RedisTbTransactionalCache - redis cluster watch support by a single key --- .../cache/RedisTbTransactionalCache.java | 37 +++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java index ed39bf3716..2cf326905c 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java @@ -21,20 +21,27 @@ import org.springframework.cache.support.NullValue; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisStringCommands; +import org.springframework.data.redis.connection.jedis.JedisClusterConnection; +import org.springframework.data.redis.connection.jedis.JedisConnection; import org.springframework.data.redis.core.types.Expiration; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.util.JedisClusterCRC16; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; @Slf4j public abstract class RedisTbTransactionalCache implements TbTransactionalCache { private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE); + static final JedisPool MOCK_POOL = new JedisPool(); //non-null pool required for JedisConnection to trigger closing jedis connection @Getter private final String cacheName; @@ -53,12 +60,12 @@ public abstract class RedisTbTransactionalCache x.get(cacheName)) + .map(CacheSpecs::getTimeToLiveInMinutes) + .map(t -> Expiration.from(t, TimeUnit.MINUTES)) + .orElseGet(Expiration::persistent); } @Override @@ -130,8 +137,24 @@ public abstract class RedisTbTransactionalCache(this, connection); } + RedisConnection getConnection(byte[] rawKey) { + RedisConnection connection = connectionFactory.getClusterConnection(); + if (!(connection instanceof JedisClusterConnection)) { + return connection; + } + + int slotNum = JedisClusterCRC16.getSlot(rawKey); + Jedis jedis = ((JedisClusterConnection) connection).getNativeConnection().getConnectionFromSlot(slotNum); + + JedisConnection jedisConnection = new JedisConnection(jedis, MOCK_POOL, jedis.getDB()); + jedisConnection.setConvertPipelineAndTxResults(connectionFactory.getConvertPipelineAndTxResults()); + + return jedisConnection; + } + private RedisConnection watch(byte[][] rawKeysList) { - var connection = connectionFactory.getConnection(); + //TODO process keys only on suitable slot connection, see getConnection(byte[] rawKey) + RedisConnection connection = getConnection(rawKeysList[0]); try { connection.watch(rawKeysList); connection.multi();